[jira] [Commented] (SPARK-24906) Adaptively set split size for columnar file to ensure the task read data size fit expectation

2020-01-01 Thread Jason Guo (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006571#comment-17006571
 ] 

Jason Guo commented on SPARK-24906:
---

[~lio...@taboola.com]

Yes, estimating with sampling would do better than the original proposal for 
complex schema. The solution we are using is very similar as you proposed. And 
as you pointed out, it require format specific implementation. We estimate 
based on row group (the compressed size for each columns) for Parquet and 
estimate based on stripe(the uncompressed size for each column) for ORC.

> Adaptively set split size for columnar file to ensure the task read data size 
> fit expectation
> -
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Major
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29031) Materialized column to accelerate queries

2019-09-15 Thread Jason Guo (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930230#comment-16930230
 ] 

Jason Guo commented on SPARK-29031:
---

[~lishuming] `Materialized column` is supported in Clickhouse 
[https://clickhouse.yandex/docs/en/query_language/create/]

> Materialized column to accelerate queries
> -
>
> Key: SPARK-29031
> URL: https://issues.apache.org/jira/browse/SPARK-29031
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Guo
>Priority: Major
>  Labels: SPIP
>
> Goals
>  * Add a new SQL grammar of Materialized column
>  * Implicitly rewrite SQL queries on the complex type of columns if there is 
> a materialized columns for it
>  * If the data type of the materialized columns is atomic type, even though 
> the origin column type is in complex type, enable vectorized read and filter 
> pushdown to improve performance
> Example
> Create a normal table
> {quote}CREATE TABLE x (
> name STRING,
> age INT,
> params STRING,
> event MAP
> ) USING parquet;
> {quote}
>  
> Add materialized columns to an existing table
> {quote}ALTER TABLE x ADD COLUMNS (
> new_age INT MATERIALIZED age + 1,
> city STRING MATERIALIZED get_json_object(params, '$.city'),
> label STRING MATERIALIZED event['label']
> );
> {quote}
>  
> When issue a query as below
> {quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label']
> FROM x
> WHER event['label'] = 'newuser';
> {quote}
> It's equivalent to
> {quote}SELECT name, new_age, city, label
> FROM x
> WHERE label = 'newuser';
> {quote}
>  
> The query performance improved dramatically because
>  # The new query (after rewritten) will read the new column city (in string 
> type) instead of read the whole map of params(in map string). Much lesser 
> data are need to read
>  # Vectorized read can be utilized in the new query and can not be used in 
> the old one. Because vectorized read can only be enabled when all required 
> columns are in atomic type
>  # Filter can be pushdown. Only filters on atomic column can be pushdown. The 
> original filter  event['label'] = 'newuser' is on complex column, so it can 
> not be pushdown.
>  # The new query do not need to parse JSON any more. JSON parse is a CPU 
> intensive operation which will impact performance dramatically
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29031) Materialized column to accelerate queries

2019-09-10 Thread Jason Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-29031:
--
Description: 
Goals
 * Add a new SQL grammar of Materialized column
 * Implicitly rewrite SQL queries on the complex type of columns if there is a 
materialized columns for it
 * If the data type of the materialized columns is atomic type, even though the 
origin column type is in complex type, enable vectorized read and filter 
pushdown to improve performance

Example

Create a normal table
{quote}CREATE TABLE x (

name STRING,

age INT,

params STRING,

event MAP

) USING parquet;
{quote}
 

Add materialized columns to an existing table
{quote}ALTER TABLE x ADD COLUMNS (

new_age INT MATERIALIZED age + 1,

city STRING MATERIALIZED get_json_object(params, '$.city'),

label STRING MATERIALIZED event['label']

);
{quote}
 

When issue a query as below
{quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label']

FROM x

WHER event['label'] = 'newuser';
{quote}
It's equivalent to
{quote}SELECT name, new_age, city, label

FROM x

WHERE label = 'newuser';
{quote}
 

The query performance improved dramatically because
 # The new query (after rewritten) will read the new column city (in string 
type) instead of read the whole map of params(in map string). Much lesser data 
are need to read
 # Vectorized read can be utilized in the new query and can not be used in the 
old one. Because vectorized read can only be enabled when all required columns 
are in atomic type
 # Filter can be pushdown. Only filters on atomic column can be pushdown. The 
original filter  event['label'] = 'newuser' is on complex column, so it can not 
be pushdown.
 # The new query do not need to parse JSON any more. JSON parse is a CPU 
intensive operation which will impact performance dramatically

 

 

 

 

 

  was:
Goals
 * Add a new SQL grammar of Materialized column
 * Implicitly rewrite SQL queries on the complex type of columns if there is a 
materialized columns for it
 * If the data type of the materialized columns is atomic type, even though the 
origin column type is in complex type, enable vectorized read and filter 
pushdown to improve performance

Example

Create a normal table
{quote}CREATE TABLE x (

name STRING,

age INT,

params STRING,

event MAP

) USING parquet;
{quote}
 

Add materialized columns to an existing table
{quote}ALTER TABLE x ADD COLUMNS (

new_age INT MATERIALIZED age + 1,

city STRING MATERIALIZED get_json_object(params, '$.city'),

label STRING MATERIALIZED event['label']

);
{quote}
 

When issue a query as below
{quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label']

FROM x

WHER event['label'] = 'newuser';
{quote}
It equals to
{quote}SELECT name, new_age, city, label 

FROM x

WHERE label = 'newuser';
{quote}
 

The query performance improved dramatically because
 # The new query (after rewritten) will read the new column city (in string 
type) instead of read the whole map of params(in map string). Much lesser data 
are need to read
 # Vectorized read can be utilized in the new query and can not be used in the 
old one. Because vectorized read can only be enabled when all required columns 
are in atomic type
 # Filter can be pushdown. Only filters on atomic column can be pushdown. The 
original filter  event['label'] = 'newuser' is on complex column, so it can not 
be pushdown.
 # The new query do not need to parse JSON any more. JSON parse is a CPU 
intensive operation which will impact performance dramatically

 

 

 

 

 


> Materialized column to accelerate queries
> -
>
> Key: SPARK-29031
> URL: https://issues.apache.org/jira/browse/SPARK-29031
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Guo
>Priority: Major
>  Labels: SPIP
>
> Goals
>  * Add a new SQL grammar of Materialized column
>  * Implicitly rewrite SQL queries on the complex type of columns if there is 
> a materialized columns for it
>  * If the data type of the materialized columns is atomic type, even though 
> the origin column type is in complex type, enable vectorized read and filter 
> pushdown to improve performance
> Example
> Create a normal table
> {quote}CREATE TABLE x (
> name STRING,
> age INT,
> params STRING,
> event MAP
> ) USING parquet;
> {quote}
>  
> Add materialized columns to an existing table
> {quote}ALTER TABLE x ADD COLUMNS (
> new_age INT MATERIALIZED age + 1,
> city STRING MATERIALIZED get_json_object(params, '$.city'),
> label STRING MATERIALIZED event['label']
> );
> {quote}
>  
> When issue a query as below
> {quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label']
> FROM 

[jira] [Created] (SPARK-29031) Materialized column to accelerate queries

2019-09-10 Thread Jason Guo (Jira)
Jason Guo created SPARK-29031:
-

 Summary: Materialized column to accelerate queries
 Key: SPARK-29031
 URL: https://issues.apache.org/jira/browse/SPARK-29031
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 3.0.0
Reporter: Jason Guo


Goals
 * Add a new SQL grammar of Materialized column
 * Implicitly rewrite SQL queries on the complex type of columns if there is a 
materialized columns for it
 * If the data type of the materialized columns is atomic type, even though the 
origin column type is in complex type, enable vectorized read and filter 
pushdown to improve performance

Example

Create a normal table
{quote}CREATE TABLE x (

name STRING,

age INT,

params STRING,

event MAP

) USING parquet;
{quote}
 

Add materialized columns to an existing table
{quote}ALTER TABLE x ADD COLUMNS (

new_age INT MATERIALIZED age + 1,

city STRING MATERIALIZED get_json_object(params, '$.city'),

label STRING MATERIALIZED event['label']

);
{quote}
 

When issue a query as below
{quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label']

FROM x

WHER event['label'] = 'newuser';
{quote}
It equals to
{quote}SELECT name, new_age, city, label 

FROM x

WHERE label = 'newuser';
{quote}
 

The query performance improved dramatically because
 # The new query (after rewritten) will read the new column city (in string 
type) instead of read the whole map of params(in map string). Much lesser data 
are need to read
 # Vectorized read can be utilized in the new query and can not be used in the 
old one. Because vectorized read can only be enabled when all required columns 
are in atomic type
 # Filter can be pushdown. Only filters on atomic column can be pushdown. The 
original filter  event['label'] = 'newuser' is on complex column, so it can not 
be pushdown.
 # The new query do not need to parse JSON any more. JSON parse is a CPU 
intensive operation which will impact performance dramatically

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join

2019-06-01 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Shepherd:   (was: Dongjoon Hyun)

> SkewJoin--handle only skewed keys with broadcastjoin and other keys with 
> normal join
> 
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, 
> time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than 
> other tasks because they need to handle the skewed key
> *Solution*
>  * Provide a hint to indicate which keys are skewed keys
>  * Handle the skewed keys with broadcastjoin and join the non-skewed keys 
> with normal joint method
>  * For the small table, the whole table is larger than the broadcast 
> threshold. But total size of the records with the same keys which is skewed 
> keys in the big table is smaller than the broadcast threshold, so these 
> records can be joint with the big table with broadcast join
>  * For other records with non-skewed keys, they can be joint with normal join 
> method
>  * We can get the final result with union the above two parts
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
> !sql.png!  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are 
> joint with sort merge join
> !skew join DAG.png!  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join

2019-05-29 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Shepherd: Dongjoon Hyun  (was: Liang-Chi Hsieh)

> SkewJoin--handle only skewed keys with broadcastjoin and other keys with 
> normal join
> 
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, 
> time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than 
> other tasks because they need to handle the skewed key
> *Solution*
>  * Provide a hint to indicate which keys are skewed keys
>  * Handle the skewed keys with broadcastjoin and join the non-skewed keys 
> with normal joint method
>  * For the small table, the whole table is larger than the broadcast 
> threshold. But total size of the records with the same keys which is skewed 
> keys in the big table is smaller than the broadcast threshold, so these 
> records can be joint with the big table with broadcast join
>  * For other records with non-skewed keys, they can be joint with normal join 
> method
>  * We can get the final result with union the above two parts
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
> !sql.png!  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are 
> joint with sort merge join
> !skew join DAG.png!  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27865) Spark SQL support 1:N sort merge bucket join without shuffle

2019-05-29 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27865:
--
Shepherd: Dongjoon Hyun

> Spark SQL support 1:N sort merge bucket join without shuffle
> 
>
> Key: SPARK-27865
> URL: https://issues.apache.org/jira/browse/SPARK-27865
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
>
> This feature is to support 1:N bucket join
>  
> Without the feature, only bucketed tables which have the same buckets can be 
> joint with sort merge bucket join. If two tables have different number of 
> buckets, shuffle is needed
>  
> This feature make it possible to join two bucketed tables with sort merge 
> bucket join without shuffle



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27865) Spark SQL support 1:N sort merge bucket join without shuffle

2019-05-28 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27865:
--
Summary: Spark SQL support 1:N sort merge bucket join without shuffle  
(was: Spark SQL support 1:N sort merge bucket join)

> Spark SQL support 1:N sort merge bucket join without shuffle
> 
>
> Key: SPARK-27865
> URL: https://issues.apache.org/jira/browse/SPARK-27865
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
>
> This feature is to support 1:N bucket join
>  
> Without the feature, only bucketed tables which have the same buckets can be 
> joint with sort merge bucket join. If two tables have different number of 
> buckets, shuffle is needed
>  
> This feature make it possible to join two bucketed tables with sort merge 
> bucket join without shuffle



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27865) Spark SQL support 1:N sort merge bucket join

2019-05-28 Thread Jason Guo (JIRA)
Jason Guo created SPARK-27865:
-

 Summary: Spark SQL support 1:N sort merge bucket join
 Key: SPARK-27865
 URL: https://issues.apache.org/jira/browse/SPARK-27865
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.3
Reporter: Jason Guo


This feature is to support 1:N bucket join

 

Without the feature, only bucketed tables which have the same buckets can be 
joint with sort merge bucket join. If two tables have different number of 
buckets, shuffle is needed

 

This feature make it possible to join two bucketed tables with sort merge 
bucket join without shuffle



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Description: 
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (big_skewed) which contains a a few skewed key
 * A small table (small_even) which has no skewed key and is larger than the 
broadcast threshold 
 * When big_skewed.join(small_even), a few tasks will be much slower than other 
tasks because they need to handle the skewed key

*Solution*
 * Provide a hint to indicate which keys are skewed keys
 * Handle the skewed keys with broadcastjoin and join the non-skewed keys with 
normal joint method
 * For the small table, the whole table is larger than the broadcast threshold. 
But total size of the records with the same keys which is skewed keys in the 
big table is smaller than the broadcast threshold, so these records can be 
joint with the big table with broadcast join
 * For other records with non-skewed keys, they can be joint with normal join 
method
 * We can get the final result with union the above two parts

*Effect*

This feature reduce the join time from 5.7 minutes to 2.1 minutes

!time.png!

!sql.png!  

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE big_skewed
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE small_even
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select big_skewed.id, tabig_skewed.value, small_even.value
from big_skewed
join small_even
on small_even.id=big_skewed.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

!skew join DAG.png!  

 

 

  was:
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (big_skewed) which contains a a few skewed key
 * A small table (small_even) which has no skewed key and is larger than the 
broadcast threshold 
 * When big_skewed.join(small_even), a few tasks will be much slower than other 
tasks because they need to handle the skewed key

*Effect*

This feature reduce the join time from 5.7 minutes to 2.1 minutes

!time.png!

!sql.png!  

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE big_skewed
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE small_even
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select big_skewed.id, tabig_skewed.value, small_even.value
from big_skewed
join small_even
on small_even.id=big_skewed.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

!skew join DAG.png!  

 

 


> SkewJoin--handle only skewed keys with broadcastjoin and other keys with 
> normal join
> 
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, 
> time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than 
> other tasks because they need to handle the skewed key
> *Solution*
>  * Provide a hint to indicate which keys are skewed keys
>  * Handle the skewed keys with broadcastjoin and join the non-skewed keys 
> with normal 

[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Shepherd: Liang-Chi Hsieh

> SkewJoin--handle only skewed keys with broadcastjoin and other keys with 
> normal join
> 
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, 
> time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than 
> other tasks because they need to handle the skewed key
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
> !sql.png!  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are 
> joint with sort merge join
> !skew join DAG.png!  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Summary: SkewJoin--handle only skewed keys with broadcastjoin and other 
keys with normal join  (was: SkewJoin hint)

> SkewJoin--handle only skewed keys with broadcastjoin and other keys with 
> normal join
> 
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, 
> time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than 
> other tasks because they need to handle the skewed key
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
> !sql.png!  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are 
> joint with sort merge join
> !skew join DAG.png!  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Attachment: sql.png

> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, 
> time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than 
> other tasks because they need to handle the skewed key
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
>  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are 
> joint with sort merge join
> !skew join DAG.png!  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Description: 
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (big_skewed) which contains a a few skewed key
 * A small table (small_even) which has no skewed key and is larger than the 
broadcast threshold 
 * When big_skewed.join(small_even), a few tasks will be much slower than other 
tasks because they need to handle the skewed key

*Effect*

This feature reduce the join time from 5.7 minutes to 2.1 minutes

!time.png!

!sql.png!  

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE big_skewed
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE small_even
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select big_skewed.id, tabig_skewed.value, small_even.value
from big_skewed
join small_even
on small_even.id=big_skewed.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

!skew join DAG.png!  

 

 

  was:
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (big_skewed) which contains a a few skewed key
 * A small table (small_even) which has no skewed key and is larger than the 
broadcast threshold 
 * When big_skewed.join(small_even), a few tasks will be much slower than other 
tasks because they need to handle the skewed key

*Effect*

This feature reduce the join time from 5.7 minutes to 2.1 minutes

!time.png!

 

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE big_skewed
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE small_even
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select big_skewed.id, tabig_skewed.value, small_even.value
from big_skewed
join small_even
on small_even.id=big_skewed.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

!skew join DAG.png!  

 

 


> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, 
> time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than 
> other tasks because they need to handle the skewed key
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
> !sql.png!  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> 

[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Description: 
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (big_skewed) which contains a a few skewed key
 * A small table (small_even) which has no skewed key and is larger than the 
broadcast threshold 
 * When big_skewed.join(small_even), a few tasks will be much slower than other 
tasks because they need to handle the skewed key

*Effect*

This feature reduce the join time from 5.7 minutes to 2.1 minutes

!time.png!

 

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE big_skewed
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE small_even
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select big_skewed.id, tabig_skewed.value, small_even.value
from big_skewed
join small_even
on small_even.id=big_skewed.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

!skew join DAG.png!  

 

 

  was:
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (tableA) which contains a a few skewed key
 * A small table (tableB) which has no skewed key and is larger than the 
broadcast threshold 
 * When tableA.join(tableB), a few tasks will be much slower than other tasks 
because they need to handle the skewed key

*Effect*

This feature reduce the join time from 5.7 minutes to 2.1 minutes

!time.png!

 

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE tableA
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE tableB
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select tableA.id, tableA.value, tableB.value
from tableA
join tableB
on tableA.id=tableB.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

!skew join DAG.png!  

 

 


> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than 
> other tasks because they need to handle the skewed key
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
>  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>  
> The sort merge join is slow with 2 

[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Description: 
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (tableA) which contains a a few skewed key
 * A small table (tableB) which has no skewed key and is larger than the 
broadcast threshold 
 * When tableA.join(tableB), a few tasks will be much slower than other tasks 
because they need to handle the skewed key

*Effect*

This feature reduce the join time from 5.7 minutes to 2.1 minutes

!time.png!

 

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE tableA
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE tableB
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select tableA.id, tableA.value, tableB.value
from tableA
join tableB
on tableA.id=tableB.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

!skew join DAG.png!  

 

 

  was:
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (tableA) which contains a a few skewed key
 * A small table (tableB) which has no skewed key and is larger than the 
broadcast threshold 
 * When tableA.join(tableB), a few tasks will be much slower than other tasks 
because they need to handle the skewed key

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE tableA
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE tableB
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select tableA.id, tableA.value, tableB.value
from tableA
join tableB
on tableA.id=tableB.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

 

 

 


> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (tableA) which contains a a few skewed key
>  * A small table (tableB) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When tableA.join(tableB), a few tasks will be much slower than other tasks 
> because they need to handle the skewed key
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
>  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE tableA
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE tableB
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select tableA.id, tableA.value, tableB.value
> from tableA
> join tableB
> on tableA.id=tableB.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are 
> joint with sort merge join
> !skew 

[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Attachment: time.png
skew join DAG.png

> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (tableA) which contains a a few skewed key
>  * A small table (tableB) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When tableA.join(tableB), a few tasks will be much slower than other tasks 
> because they need to handle the skewed key
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE tableA
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE tableB
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select tableA.id, tableA.value, tableB.value
> from tableA
> join tableB
> on tableA.id=tableB.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are 
> joint with sort merge join
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Description: 
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (tableA) which contains a a few skewed key
 * A small table (tableB) which has no skewed key and is larger than the 
broadcast threshold 
 * When tableA.join(tableB), a few tasks will be much slower than other tasks 
because they need to handle the skewed key

 

*Experiment*

*Without this feature, the whole job took 5.7 minutes*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE tableA
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE tableB
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select tableA.id, tableA.value, tableB.value
from tableA
join tableB
on tableA.id=tableB.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

  !SMJ tasks.png!

 

*With this feature, the job took only 2.1 minutes*

The skewed keys are joint with broadcast join and the non-skewed keys are joint 
with sort merge join

 

 

 

  was:
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (tableA) which contains a a few skewed key
 * A small table (tableB) which has no skewed key and is larger than the 
broadcast threshold 
 * When tableA.join(tableB), a few tasks will be much slower than other tasks 
because they need to handle the skewed key

 

*Experiment*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE tableA
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE tableB
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select tableA.id, tableA.value, tableB.value
from tableA
join tableB
on tableA.id=tableB.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

 

 

 

 


> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (tableA) which contains a a few skewed key
>  * A small table (tableB) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When tableA.join(tableB), a few tasks will be much slower than other tasks 
> because they need to handle the skewed key
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE tableA
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE tableB
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select tableA.id, tableA.value, tableB.value
> from tableA
> join tableB
> on tableA.id=tableB.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are 
> joint with sort merge join
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Attachment: SMJ tasks.png

> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (tableA) which contains a a few skewed key
>  * A small table (tableB) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When tableA.join(tableB), a few tasks will be much slower than other tasks 
> because they need to handle the skewed key
>  
> *Experiment*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE tableA
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE tableB
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select tableA.id, tableA.value, tableB.value
> from tableA
> join tableB
> on tableA.id=tableB.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Attachment: SMJ DAG.png

> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (tableA) which contains a a few skewed key
>  * A small table (tableB) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When tableA.join(tableB), a few tasks will be much slower than other tasks 
> because they need to handle the skewed key
>  
> *Experiment*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE tableA
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE tableB
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select tableA.id, tableA.value, tableB.value
> from tableA
> join tableB
> on tableA.id=tableB.id;
> {code}
>  
> !image-2019-05-21-20-02-57-056.png!
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Description: 
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (tableA) which contains a a few skewed key
 * A small table (tableB) which has no skewed key and is larger than the 
broadcast threshold 
 * When tableA.join(tableB), a few tasks will be much slower than other tasks 
because they need to handle the skewed key

 

*Experiment*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE tableA
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE tableB
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select tableA.id, tableA.value, tableB.value
from tableA
join tableB
on tableA.id=tableB.id;
{code}
 

The sort merge join is slow with 2 straggle tasks

!SMJ DAG.png!  

 

 

 

 

  was:
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (tableA) which contains a a few skewed key
 * A small table (tableB) which has no skewed key and is larger than the 
broadcast threshold 
 * When tableA.join(tableB), a few tasks will be much slower than other tasks 
because they need to handle the skewed key

 

*Experiment*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE tableA
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE tableB
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select tableA.id, tableA.value, tableB.value
from tableA
join tableB
on tableA.id=tableB.id;
{code}
 

!image-2019-05-21-20-02-57-056.png!

 

 

 

 


> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
> Attachments: SMJ DAG.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (tableA) which contains a a few skewed key
>  * A small table (tableB) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When tableA.join(tableB), a few tasks will be much slower than other tasks 
> because they need to handle the skewed key
>  
> *Experiment*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE tableA
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE tableB
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select tableA.id, tableA.value, tableB.value
> from tableA
> join tableB
> on tableA.id=tableB.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-27792:
--
Description: 
This feature is designed to handle data skew in Join

 

*Senario*
 * A big table (tableA) which contains a a few skewed key
 * A small table (tableB) which has no skewed key and is larger than the 
broadcast threshold 
 * When tableA.join(tableB), a few tasks will be much slower than other tasks 
because they need to handle the skewed key

 

*Experiment*

tableA has 2 skewed keys 9500048 and 9500096
{code:java}
INSERT OVERWRITE TABLE tableA
SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) 
+ 1) * 48 )
 ELSE CAST(id/100 AS INT) END AS STRING), 'A'
 name
FROM ids
WHERE id BETWEEN 9 AND 105000;{code}
tableB has no skewed keys
{code:java}
INSERT OVERWRITE TABLE tableB
SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
 name
FROM ids
WHERE id BETWEEN 95000 AND 95050;{code}
 

Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
{code:java}
insert overwrite table result_with_skew
select tableA.id, tableA.value, tableB.value
from tableA
join tableB
on tableA.id=tableB.id;
{code}
 

!image-2019-05-21-20-02-57-056.png!

 

 

 

 

> SkewJoin hint
> -
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Jason Guo
>Priority: Major
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (tableA) which contains a a few skewed key
>  * A small table (tableB) which has no skewed key and is larger than the 
> broadcast threshold 
>  * When tableA.join(tableB), a few tasks will be much slower than other tasks 
> because they need to handle the skewed key
>  
> *Experiment*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE tableA
> SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS 
> INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 9 AND 105000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE tableB
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 95000 AND 95050;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select tableA.id, tableA.value, tableB.value
> from tableA
> join tableB
> on tableA.id=tableB.id;
> {code}
>  
> !image-2019-05-21-20-02-57-056.png!
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27792) SkewJoin hint

2019-05-21 Thread Jason Guo (JIRA)
Jason Guo created SPARK-27792:
-

 Summary: SkewJoin hint
 Key: SPARK-27792
 URL: https://issues.apache.org/jira/browse/SPARK-27792
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.3
Reporter: Jason Guo






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571027#comment-16571027
 ] 

Jason Guo commented on SPARK-25038:
---

[~hyukjin.kwon] Gotcha

I will create a PR for this today

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Major
> Attachments: issue sql optimized.png, issue sql original.png, job 
> start optimized.png, job start original.png
>
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> Before optimization, it takes 2 minutes and 9 seconds to generate the Job
>  
> The SQL is issued at 2018-08-07 09:07:41
> !issue sql original.png!
> However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 9 
> seconds later than the SQL issue time
> !job start original.png!
>  
> After the optimization, it takes only 4 seconds to generate the Job
> The SQL is issued at 2018-08-07 09:20:15
> !issue sql optimized.png!
>  
> And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later 
> than the SQL issue time
> !job start optimized.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Description: 
When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

Before optimization, it takes 2 minutes and 9 seconds to generate the Job

 

The SQL is issued at 2018-08-07 09:07:41

!issue sql original.png!

However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 9 
seconds later than the SQL issue time

!job start original.png!

 

After the optimization, it takes only 4 seconds to generate the Job

The SQL is issued at 2018-08-07 09:20:15

!issue sql optimized.png!

 

And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later than 
the SQL issue time

!job start optimized.png!

 

 

  was:
When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

The SQL is issued at 2018-08-07 08:43:48

!issue sql original.png!

However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 
seconds later than the SQL issue time

  !job start original.png!

 

 


> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: issue sql optimized.png, issue sql original.png, job 
> start optimized.png, job start original.png
>
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> Before optimization, it takes 2 minutes and 9 seconds to generate the Job
>  
> The SQL is issued at 2018-08-07 09:07:41
> !issue sql original.png!
> However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 9 
> seconds later than the SQL issue time
> !job start original.png!
>  
> After the optimization, it takes only 4 seconds to generate the Job
> The SQL is issued at 2018-08-07 09:20:15
> !issue sql optimized.png!
>  
> And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later 
> than the SQL issue time
> !job start optimized.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Attachment: (was: job start original.png)

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: issue sql optimized.png, issue sql original.png, job 
> start optimized.png, job start original.png
>
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !issue sql original.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !job start original.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Attachment: (was: issue sql original.png)

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: issue sql optimized.png, issue sql original.png, job 
> start optimized.png, job start original.png
>
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !issue sql original.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !job start original.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Attachment: job start original.png
job start optimized.png
issue sql original.png
issue sql optimized.png

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: issue sql optimized.png, issue sql original.png, job 
> start optimized.png, job start original.png
>
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !issue sql original.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !job start original.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Description: 
When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

The SQL is issued at 2018-08-07 08:43:48

!issue sql original.png!

However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 
seconds later than the SQL issue time

  !job start original.png!

 

 

  was:
When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

The SQL is issued at 2018-08-07 08:43:48

!image-2018-08-07-08-52-00-558.png!

However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 
seconds later than the SQL issue time

  !image-2018-08-07-08-52-09-648.png!

 

 

 


> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: issue sql original.png, job start original.png
>
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !issue sql original.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !job start original.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Attachment: issue sql original.png

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: issue sql original.png, job start original.png
>
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !image-2018-08-07-08-52-00-558.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !image-2018-08-07-08-52-09-648.png!
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Attachment: job start original.png

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: issue sql original.png, job start original.png
>
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !image-2018-08-07-08-52-00-558.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !image-2018-08-07-08-52-09-648.png!
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Attachment: start.png
issue.png

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !image-2018-08-07-08-52-00-558.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !image-2018-08-07-08-52-09-648.png!
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Attachment: (was: start.png)

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !image-2018-08-07-08-52-00-558.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !image-2018-08-07-08-52-09-648.png!
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Attachment: (was: issue.png)

> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !image-2018-08-07-08-52-00-558.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !image-2018-08-07-08-52-09-648.png!
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Description: 
When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

The SQL is issued at 2018-08-07 08:43:48

!image-2018-08-07-08-52-00-558.png!

However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 
seconds later than the SQL issue time

  !image-2018-08-07-08-52-09-648.png!

 

 

 

  was:
When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

The SQL is issued at 2018-08-07 08:43:48

!image-2018-08-07-08-48-28-753.png!

However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 
seconds later than the SQL issue time

!image-2018-08-07-08-47-06-321.png!  

 

 

 

 


> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !image-2018-08-07-08-52-00-558.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
>   !image-2018-08-07-08-52-09-648.png!
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-25038:
--
Description: 
When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

The SQL is issued at 2018-08-07 08:43:48

!image-2018-08-07-08-48-28-753.png!

However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 
seconds later than the SQL issue time

!image-2018-08-07-08-47-06-321.png!  

 

 

 

 

  was:
When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

The SQL is issued at 2018-08-05 18:33:21

!image-2018-08-07-08-38-01-984.png!

However, the job is submitted at 2018-08-05 18:34:45, which is 1minutes and 24 
seconds later than the SQL issue time

 

 

 


> Accelerate Spark Plan generation when Spark SQL read large amount of data
> -
>
> Key: SPARK-25038
> URL: https://issues.apache.org/jira/browse/SPARK-25038
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
>
> When Spark SQL read large amount of data, it take a long time (more than 10 
> minutes) to generate physical Plan and then ActiveJob
>  
> Example:
> There is a table which is partitioned by date and hour. There are more than 
> 13 TB data each hour and 185 TB per day. When we just issue a very simple 
> SQL, it take a long time to generate ActiveJob
>  
> The SQL statement is
> {code:java}
> select count(device_id) from test_tbl where date=20180731 and hour='21';
> {code}
>  
> The SQL is issued at 2018-08-07 08:43:48
> !image-2018-08-07-08-48-28-753.png!
> However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 
> 17 seconds later than the SQL issue time
> !image-2018-08-07-08-47-06-321.png!  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data

2018-08-06 Thread Jason Guo (JIRA)
Jason Guo created SPARK-25038:
-

 Summary: Accelerate Spark Plan generation when Spark SQL read 
large amount of data
 Key: SPARK-25038
 URL: https://issues.apache.org/jira/browse/SPARK-25038
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.1
Reporter: Jason Guo


When Spark SQL read large amount of data, it take a long time (more than 10 
minutes) to generate physical Plan and then ActiveJob

 

Example:

There is a table which is partitioned by date and hour. There are more than 13 
TB data each hour and 185 TB per day. When we just issue a very simple SQL, it 
take a long time to generate ActiveJob

 

The SQL statement is
{code:java}
select count(device_id) from test_tbl where date=20180731 and hour='21';
{code}
 

The SQL is issued at 2018-08-05 18:33:21

!image-2018-08-07-08-38-01-984.png!

However, the job is submitted at 2018-08-05 18:34:45, which is 1minutes and 24 
seconds later than the SQL issue time

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24906) Adaptively set split size for columnar file to ensure the task read data size fit expectation

2018-08-06 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-24906:
--
Summary: Adaptively set split size for columnar file to ensure the task 
read data size fit expectation  (was: Enlarge split size for columnar file to 
ensure the task read enough data)

> Adaptively set split size for columnar file to ensure the task read data size 
> fit expectation
> -
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-25 Thread Jason Guo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556413#comment-16556413
 ] 

Jason Guo commented on SPARK-24906:
---

[~maropu]  [~viirya]  What do you think about this idea ?

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-25 Thread Jason Guo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554972#comment-16554972
 ] 

Jason Guo edited comment on SPARK-24906 at 7/25/18 6:09 AM:


Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our 
cluster  

1. When this will be enabled?

There is a configuration item named 
{code:java}
spark.sql.parquet.adaptiveFileSplit=false{code}
 

Only when this is enabled, DataSourceScanExec will enlarge the 
mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc 
query will set it to true)

 

With this configuration, user will know that spark will adjust the partition / 
split size adaptively. If user do not want to use this, he or she can disable it

 

2. How to calculate maxPartitionBytes and openCostInBytes

Different data type has different length (calculated with 
DataType.defaultSize). First we get the total size of the whole table 
(henceforth referred to as the “T”). Then we get the total size of all the 
requiredSchema (henceforth referred to as the “R”). The multiplier should be T 
/ R  .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R 
times.

 

 

 


was (Author: habren):
Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our 
cluster (more than 40 thousands nodes in total and more than 10 thousands nodes 
for a single cluster). 

1. When this will be enabled?

There is a configuration item named 
{code:java}
spark.sql.parquet.adaptiveFileSplit=false{code}
 

Only when this is enabled, DataSourceScanExec will enlarge the 
mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc 
query will set it to true)

 

With this configuration, user will know that spark will adjust the partition / 
split size adaptively. If user do not want to use this, he or she can disable it

 

2. How to calculate maxPartitionBytes and openCostInBytes

Different data type has different length (calculated with 
DataType.defaultSize). First we get the total size of the whole table 
(henceforth referred to as the “T”). Then we get the total size of all the 
requiredSchema (henceforth referred to as the “R”). The multiplier should be T 
/ R  .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R 
times.

 

 

 

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-24 Thread Jason Guo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554972#comment-16554972
 ] 

Jason Guo edited comment on SPARK-24906 at 7/25/18 1:03 AM:


Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our 
cluster (more than 40 thousands nodes in total and more than 10 thousands nodes 
for a single cluster). 

1. When this will be enabled?

There is a configuration item named 
{code:java}
spark.sql.parquet.adaptiveFileSplit=false{code}
 

Only when this is enabled, DataSourceScanExec will enlarge the 
mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc 
query will set it to true)

 

With this configuration, user will know that spark will adjust the partition / 
split size adaptively. If user do not want to use this, he or she can disable it

 

2. How to calculate maxPartitionBytes and openCostInBytes

Different data type has different length (calculated with 
DataType.defaultSize). First we get the total size of the whole table 
(henceforth referred to as the “T”). Then we get the total size of all the 
requiredSchema (henceforth referred to as the “R”). The multiplier should be T 
/ R  .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R 
times.

 

 

 


was (Author: habren):
Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our 
cluster (more than 40 thousands nodes in total and more than 10 thousands nodes 
for a single cluster). 

1. When this is enabled

There is a configuration item named 

 
{code:java}
spark.sql.parquet.adaptiveFileSplit=false{code}
 

Only when this is enabled, DataSourceScanExec will enlarge the 
mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc 
query will set it to true)

 

2. How to calculate maxPartitionBytes and openCostInBytes

Different data type has different length (calculated with 
DataType.defaultSize). First we get the total size of the whole table 
(henceforth referred to as the “T”). Then we get the total size of all the 
requiredSchema (henceforth referred to as the “R”). The multiplier should be T 
/ R  .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R 
times.

 

 

 

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-24 Thread Jason Guo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554972#comment-16554972
 ] 

Jason Guo commented on SPARK-24906:
---

Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our 
cluster (more than 40 thousands nodes in total and more than 10 thousands nodes 
for a single cluster). 

1. When this is enabled

There is a configuration item named 

 
{code:java}
spark.sql.parquet.adaptiveFileSplit=false{code}
 

Only when this is enabled, DataSourceScanExec will enlarge the 
mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc 
query will set it to true)

 

2. How to calculate maxPartitionBytes and openCostInBytes

Different data type has different length (calculated with 
DataType.defaultSize). First we get the total size of the whole table 
(henceforth referred to as the “T”). Then we get the total size of all the 
requiredSchema (henceforth referred to as the “R”). The multiplier should be T 
/ R  .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R 
times.

 

 

 

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-24 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-24906:
--
Description: 
For columnar file, such as, when spark sql read the table, each split will be 
128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. 
Even when user set it to a large value, such as 512MB, the task may read only 
few MB or even hundreds of KB. Because the table (Parquet) may consists of 
dozens of columns while the SQL only need few columns. And spark will prune the 
unnecessary columns.

 

In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
adaptively. 

For example, there is 40 columns , 20 are integer while another 20 are long. 
When use query on an integer type column and an long type column, the 
maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 

 

With this optimization, the number of task will be smaller and the job will run 
faster. More importantly, for a very large cluster (more the 10 thousand 
nodes), it will relieve RM's schedule pressure.

 

Here is the test

 

The table named test2 has more than 40 columns and there are more than 5 TB 
data each hour.

When we issue a very simple query 

 
{code:java}
select count(device_id) from test2 where date=20180708 and hour='23'{code}
 

There are 72176 tasks and the duration of the job is 4.8 minutes

!image-2018-07-24-20-26-32-441.png!

 

Most tasks last less than 1 second and read less than 1.5 MB data

!image-2018-07-24-20-28-06-269.png!

 

After the optimization, there are only 1615 tasks and the job last only 30 
seconds. It almost 10 times faster.

!image-2018-07-24-20-29-24-797.png!

 

The median of read data is 44.2MB. 

!image-2018-07-24-20-30-24-552.png!

 

  was:
For columnar file, such as, when spark sql read the table, each split will be 
128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. 
Even when user set it to a large value, such as 512MB, the task may read only 
few MB or even hundreds of KB. Because the table (Parquet) may consists of 
dozens of columns while the SQL only need few columns.

 

In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
adaptively. 

For example, there is 40 columns , 20 are integer while another 20 are long. 
When use query on an integer type column and an long type column, the 
maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 

 

With this optimization, the number of task will be smaller and the job will run 
faster. More importantly, for a very large cluster (more the 10 thousand 
nodes), it will relieve RM's schedule pressure.


> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> 

[jira] [Updated] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-24 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-24906:
--
Attachment: image-2018-07-24-20-30-24-552.png

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-24 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-24906:
--
Attachment: image-2018-07-24-20-29-24-797.png

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-24 Thread Jason Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Guo updated SPARK-24906:
--
Attachment: image-2018-07-24-20-28-06-269.png

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org