[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-07-02 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071885#comment-16071885
 ] 

liyunzhang_intel commented on HIVE-16840:
-

 For the case "select * from A order by key limit 10",  limit will be pushed 
down to the shuffle data. Each RS will only output 10 records. The effect will 
be same as HIVE-16840.patch. So close the jira.  Thanks for your suggestion and 
time on it. The reason why TPC-DS/query17 hangs in this jira is because 
HIVE-17010 and others.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-26 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062808#comment-16062808
 ] 

liyunzhang_intel commented on HIVE-16840:
-

limit push down is in HIVE-3562.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-26 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062651#comment-16062651
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~lirui]: thanks for you investigation
bq. hive should have already pushed down the limit to the upstream of shuffle. 
Looking at the RS code, it uses a TopN hash to track the top N keys in input. 
Ideally, each RS will only output N records. I tried some simple query to 
verify how this saves shuffled data.
I saw the topN in ReduceSinkOperator. But let me spend some time to 
investigate. [~xuefuz]: can you give us some suggestion?

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-23 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060835#comment-16060835
 ] 

Rui Li commented on HIVE-16840:
---

Hi [~xuefuz], I doubt Spark can take much advantage of it because the reducer 
fetches all the data anyway. But I agree limit with large number is rare so 
it's OK to leave it aside at the moment.
Another thing I'm not sure is, hive should have already pushed down the limit 
to the upstream of shuffle. Looking at the RS code, it uses a TopN hash to 
track the top N keys in input. Ideally, each RS will only output N records. I 
tried some simple query to verify how this saves shuffled data.
[~kellyzly], do you know why it's not working as expected in your case?

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-21 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16057563#comment-16057563
 ] 

Xuefu Zhang commented on HIVE-16840:


Re: turning the optimization when limit number is too big

It might seem over-engineering if we are trying to be smart. In my opinion, 
it's very rare that LIMIT comes with a big number compared to the total number 
of rows. The penalty comes only when the limit number is close to the total 
number of rows. Still, it's not clear how much the penalty is.

Consider an extreme case: there are 1000 rows and user limits to 1000 rows. 
With the optimization, suppose there are 10 partitions, each sorting 100 rows 
within that partition. All of the sorted 100 row set  (total 1000) will be 
shuffled to one reducer doing global sorting. The last step may not be as 
costly as the global shuffle w/o the optimization because each 100 row set is 
already sorted. (I'm not entirely sure if Spark can take advantage of that, 
though.)

It might be sufficient to provide a configuration to turn this optimization off 
if user knows what he/she is doing. Otherwise, the optimization should be on by 
default.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This messag

[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-20 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16057007#comment-16057007
 ] 

Rui Li commented on HIVE-16840:
---

Besides, it's better to add a separate optimizer for this optimization. 
SetSparkReducerParallelism is only intended to set parallelism for RSes.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-20 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056928#comment-16056928
 ] 

Rui Li commented on HIVE-16840:
---

bq. you mean that if the limit number is too large...
Yeah. But it's a little tricky to set a proper upper bound for it. How about we 
do something like this: if statistics is available, we can estimate the number 
of rows in the input of the RS. If the limit number is, say, >= 90% of the 
rows, we can skip the optimization. If statistics is unavailable, we run the 
optimization anyway.
You can find how we estimate num of bytes in SetSparkReducerParallelism. Guess 
we can estimate num of rows similarly.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-20 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056858#comment-16056858
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~lirui]: 
 bq.If so, I wonder whether we should put a limit on the limited number. E.g. 
if the number is too large, we should skip this optimization.
you mean that if the limit number is too large( select * from A order by ColB 
limit 100, when the total records of A is 99), there is no performance 
improvement maybe degradation because now there is 1 extra reduce.
bq.Besides, I don't think we need to add the sortLimit flag to RS. 
ReduceSinkDesc has a flag hasOrderBy indicating whether global order is needed. 
thanks for suggestion.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-20 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056852#comment-16056852
 ] 

Rui Li commented on HIVE-16840:
---

To clarify, the idea is to introduce an extra MR shuffle and push the limit to 
it right? If so, I wonder whether we should put a limit on the limited number. 
E.g. if the number is too large, we should skip this optimization.
Besides, I don't think we need to add the sortLimit flag to RS. ReduceSinkDesc 
has a flag hasOrderBy indicating whether global order is needed. We can set 
that to false for the new RS and GenSparkUtils#getEdgeProperty should give us 
the MR shuffle.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-20 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056593#comment-16056593
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz],[~lirui],[~Ferd],[~csun]:  attached is HIVE-16840.1.patch.
Changes
1.  change physical plan at SetSparkReducerParallelism#process.  If 
needSetSparkReucerParallelism return false,this stands for current sink maybe 
an order by limit case. Add newSparkSortRS(actually its type is ReduceSink), 
newSel(actually its type is Sel),newLimit(actually its type is Limit) before 
sink.
original physical plan is 
{code} ...-RS-SEL-LIMIT{code}
now physical plan is 
{code} ...-newSparkSortRS-newSel-newLimit-RS-LIMIT{code}
currently i add SetSparkReducerParallelism#getNumReducerForSparkSortRS, it 
returns 10,this set the parallelism for newSparkSortRS. i will update the 
function in next patch.
2. add a property sortLimit in ReduceSinkOperator. If it is true. use partition 
sort  not global sort in GenSparkUtils#getEdgeProperty


Not fully test about the patch, just test a simple qfile, but I think we can 
parallel, you can review, i will start fully test.
{code}

select key,value from src order by key limit 10;

{code}

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: HIVE-16840.patch
>
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSpark

[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-12 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16047194#comment-16047194
 ] 

Xuefu Zhang commented on HIVE-16840:


[~kellyzly], sorry for the confusion. I think you're right. If the final N keys 
will be definitely appear in the first N keys in each partition. Thanks.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-12 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046345#comment-16046345
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz]:  
bq.I think you cannot use partition-level sort in lieu of a global sort unless 
you use a range partitioner, in which case i guess you basically get the same 
global sort.
what made me confused is why the shuffle from Map1 to Reducer2 need global sort 
?In my opinion, we only need grantee the element is sorted in each partition(M 
partitions) and get the first N then sort M*N globally in 1 task and finally 
take N.

in SortByShuffle.java, we use repartitionAndSortWithinPartitions to implement 
the sort in each partition.
{code}
  @Override
  public JavaPairRDD shuffle(
  JavaPairRDD input, int numPartitions) {
JavaPairRDD rdd;
if (totalOrder) {
  if (numPartitions > 0) {
if (numPartitions > 1 && input.getStorageLevel() == 
StorageLevel.NONE()) {
  input.persist(StorageLevel.DISK_ONLY());
  sparkPlan.addCachedRDDId(input.id());
}
rdd = input.sortByKey(true, numPartitions);
  } else {
rdd = input.sortByKey(true);
  }
} else {
  Partitioner partitioner = new HashPartitioner(numPartitions);
  rdd = input.repartitionAndSortWithinPartitions(partitioner);
}
return rdd;
  }
{code}

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is

[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-09 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044058#comment-16044058
 ] 

Xuefu Zhang commented on HIVE-16840:


Re: only ensure the elements in each partition are ordered.

I think you cannot use partition-level sort in lieu of a global sort unless you 
use a range partitioner, in which case i guess you basically get the same 
global sort.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-08 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044036#comment-16044036
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz]:  
bq. You might take a look to see if it's possible to modify the plan as I 
suggested above. That's more aligning with the way HoS works.
will try to modify the plan.
1 thing i need to confirm is that we need not ensure the global sort in 
Map1(SORT, 100), only ensure the elements in each partition are ordered.
{code}
STAGE PLANS:
  Stage: Stage-1
Spark
  Edges:
Reducer 2 <- Map 1 (SORT, 100)
Reducer 3 <- Reducer 2 (SORT, 1)
{code}

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-08 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044007#comment-16044007
 ] 

Xuefu Zhang commented on HIVE-16840:


This might not fly. SparkContext is only available in driver. In the shuffle() 
implementation, we can only use RDD transformations.
You might take a look to see if it's possible to modify the plan as I suggested 
above. That's more aligning with the way HoS works.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-08 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043998#comment-16043998
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz]: maybe we can make the rdd by following:
{code}
// the return value of takeOrdered(N) is Array[T]
val composite1 = sc.parallelize(1 to 
200,partitionNum).map(p=>(1-p,p)).takeOrdered(firstN)

   val composite2 = SparkContext.getOrCreate.makeRDD(composite1)
{code}

Maybe this is not very good.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-08 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043985#comment-16043985
 ] 

Xuefu Zhang commented on HIVE-16840:


[~kellyzly], I believe take(n) and takeOrdered(n) is driver side actions 
instead of transformations. They don't return a RDD instance, so you probably 
has no way to return an rdd from the shuffle() implementation.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-08 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043978#comment-16043978
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz]: in current code base. select * from T order by id limit 10 is 
{code}
STAGE PLANS:
  Stage: Stage-1
Spark
  Edges:
Reducer 2 <- Map 1 (SORT, 1)
{code}
Limit is in Reduce2

what i want is adding SortByLimitShuffle to deal with orderby+limit case. the 
spark plan will be, limit will be removed from  Reducer2.
{code}
STAGE PLANS:
  Stage: Stage-1
Spark
  Edges:
Reducer 2 <- Map 1 (SORT, 100)
{code}
the reason why there is only Reducer2, not Reducer2 and Reduce3 is because we 
implement order by limit in SortByLimitShuffler.java.
SortByLimitShuffler#shuffle
{code}
 public JavaPairRDD shuffle(
  JavaPairRDD input, int numPartitions) {
JavaPairRDD rdd;
// implement orderby limit by  RDD#takeOrdered(n) or RDD.sortByKey.take(n) 
or other ways
...
return rdd;
  }
{code}



> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-08 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043964#comment-16043964
 ] 

Xuefu Zhang commented on HIVE-16840:


[~kellyzly], Thanks for sharing the info. This might not fit into HoS's design. 
I reused your example just to demo the idea. Be more specifically, I was 
thinking that for a query like select * from T order by id limit 10, instead of 
generating a Spark plan like this:
{code}
STAGE PLANS:
  Stage: Stage-1
Spark
  Edges:
Reducer 2 <- Map 1 (SORT, 1)
{code}
we generate a plan like this:
{code}
STAGE PLANS:
  Stage: Stage-1
Spark
  Edges:
Reducer 2 <- Map 1 (SORT, 100)
Reducer 3 <- Reducer 2 (SORT, 1)
{code}
In {{Reducer2}} and {{Reducer3}} there is an limit operator with 10 as the 
limit. In essence, we introduce one additional shuffle to sort, but only on 
1000 rows of data.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-08 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043951#comment-16043951
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz]:  I found that RDD has provided [order by 
limit|https://stackoverflow.com/questions/32947978/how-to-sort-an-rdd-and-limit-in-spark]
 implementation in spark.
And solve order by limit by which methods according to the driver memory
{code}
 import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
Using plain RDDs:
import org.apache.spark.rdd.RDD
import scala.math.Ordering

val rdd: RDD[Foo] = sc
  .parallelize(Seq(
("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"),
("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23")))
  .toDF("name", "createDate")
  .withColumn("createDate", $"createDate".cast("date"))
  .as[Foo].rdd

{code}
1. data fits into driver memory
1.1  fraction you want is relatively small, here 
[RDD#takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1416]
 is implemented by same algorithm you mentioned above
{code}
rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime))
{code}
1.2 fraction you want is relatively large:
{code}
rdd.sortBy(_.createDate.getTime).take(n)
{code}

2. data can not fit into driver memory
{code}
rdd
  .sortBy(_.createDate.getTime)
  .zipWithIndex
  .filter{case (_, idx) => idx < n}
  .keys
{code}


> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the pe

[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-08 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043507#comment-16043507
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz] and [~lirui]:  thanks for your suggestions. will try xuefuz's 
suggestion.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-07 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042120#comment-16042120
 ] 

Rui Li commented on HIVE-16840:
---

In theory, RDD computation is done lazily and we're only retrieving the first 
100 records. For sortByKey, the output of upstream tasks are already sorted by 
key. So our single reducer should be doing a multi-way merge sort and it can 
stop once we get 100 records. That was why I thought it's better to handle 
orderBy+limit with 1 reducer.
However, looking at the shuffling code, it seems the computation is not 
performed lazily:
https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala#L105
https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala#L180
Therefore the reducer needs to fetch all the shuffled data.

I wonder whether anything can be done on Spark side. Guess we can at least do 
some investigation.

As to the two proposals here, I prefer #2. IMO #1 may not even work, because 
once the sorted result set is stored into temp files, we can't rely on a 
{{select * limit N}} to retrieve the top N records.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was s

[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-07 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041829#comment-16041829
 ] 

Xuefu Zhang commented on HIVE-16840:


[~kellyzly], I think you're right and I was confused: sortByKey does produce 
global order. The reason for that you gave in the description is also accurate.

Looking back at your proposals, #1 is similar to select in a subquery that 
outputs ordered data. It depends on SELECT following FETCH FIRST semantics. I'm 
not sure if this is reliable.

The proposal #2 seems more plausible except this zipWithIndex(), which could be 
expensive.

Maybe we can do a combination of the two: First, we do parallel sort (with N 
partitions) but filter out rows other than first M (M is the limit), followed 
by another sort (with 1 partition) with limit of M. This way, one task will 
sort only MxN rows, which should be fast if both MxN is small. Basically we 
will do this:
{code}
val composite1 = sc.parallelize(1 to 200, 
10).map(p=>(1-p,p)).sortByKey(N).filter(first M).sortByKey(1).take(M)
{code}
Could you please check if this is possible?

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-07 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041679#comment-16041679
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz]: 
bq. sortByKey() only sorting keys within partition. 
I have not found this in spark document. in 
[SortByShuffle|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java#L51],
  sort in total order is also implemented by sortByKey spark api.  If i am 
wrong, please tell me.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-07 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041432#comment-16041432
 ] 

Xuefu Zhang commented on HIVE-16840:


[~kellyzly], thanks for looking into this. I'm not sure if either of your 
proposals works. sortByKey() only sorting keys within partition. Thus, the 
moment you have parallel sortByKey(), the data is not globally sorted. Along 
that line, I'm not sure if either your solution will give you the expected 
result.

Both Spark and Hive has optimizations to sample the key, partition keys by 
range, and then sort within each partition, which gives you a global order. You 
can probably take a look at those to see if they helps.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.
> the reason why we use 1 task to implement order by limit is 
> [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (HIVE-16840) Investigate the performance of order by limit in HoS

2017-06-06 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040150#comment-16040150
 ] 

liyunzhang_intel commented on HIVE-16840:
-

[~xuefuz],[~lirui],[~Ferd], [~csun]:
 Here provide 2 solutions to solve it
1. add an extra reduce to save the result of order and a new job to finish 
select * from (tmp result of order) limit N.
2. create SortByLimitShuffler and implement order by limit N like following 
{code}
   val composite1 = sc.parallelize(1 to 200, 
10).map(p=>(1-p,p)).sortByKey().zipWithIndex().filter{case (_, idx) => idx < 5}

{code}
sortByKey+zipWithIndex+filter to implement orderByLimit. if we use this way, we 
may need remove limit operator from reduce tree.


Appreciate to get some suggestions from you.

> Investigate the performance of order by limit in HoS
> 
>
> Key: HIVE-16840
> URL: https://issues.apache.org/jira/browse/HIVE-16840
> Project: Hive
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>,i_item_desc
>,s_state
>,count(ss_quantity) as store_sales_quantitycount
>,avg(ss_quantity) as store_sales_quantityave
>,stddev_samp(ss_quantity) as store_sales_quantitystdev
>,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>,count(sr_return_quantity) as_store_returns_quantitycount
>,avg(sr_return_quantity) as_store_returns_quantityave
>,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as 
> store_returns_quantitycov
>,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) 
> as catalog_sales_quantityave
>,stddev_samp(cs_quantity)/avg(cs_quantity) as 
> catalog_sales_quantitystdev
>,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>  ,store_returns
>  ,catalog_sales
>  ,date_dim d1
>  ,date_dim d2
>  ,date_dim d3
>  ,store
>  ,item
>  where d1.d_quarter_name = '2000Q1'
>and d1.d_date_sk = store_sales.ss_sold_date_sk
>and item.i_item_sk = store_sales.ss_item_sk
>and store.s_store_sk = store_sales.ss_store_sk
>and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>and store_sales.ss_item_sk = store_returns.sr_item_sk
>and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>and store_returns.sr_returned_date_sk = d2.d_date_sk
>and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>  ,i_item_desc
>  ,s_state
>  order by i_item_id
>  ,i_item_desc
>  ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement 
> sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 10 <- Reducer 9 (SORT, 1)
> Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 
> (PARTITION-LEVEL SORT, 889)
> Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 
> (PARTITION-LEVEL SORT, 1009)
> Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 
> (PARTITION-LEVEL SORT, 683)
> Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 
> (PARTITION-LEVEL SORT, 751)
> Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 
> (PARTITION-LEVEL SORT, 826)
> Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 
> (PARTITION-LEVEL SORT, 909)
> Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 
> (PARTITION-LEVEL SORT, 1001)
> Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 
> task to execute to ensure the correctness. But the performance is poor.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)