[ 
https://issues.apache.org/jira/browse/IMPALA-5168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Ho reassigned IMPALA-5168:
----------------------------------

    Assignee: Michael Ho

> Codegen hash computation in DataStreamSender::Send for partition exchange. 
> ---------------------------------------------------------------------------
>
>                 Key: IMPALA-5168
>                 URL: https://issues.apache.org/jira/browse/IMPALA-5168
>             Project: IMPALA
>          Issue Type: Improvement
>          Components: Backend
>    Affects Versions: Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, 
> Impala 2.10.0, Impala 2.11.0
>            Reporter: Mostafa Mokhtar
>            Assignee: Michael Ho
>            Priority: Major
>              Labels: perfomance
>
> Hash partition computation for exchange operators can benefit from codegen, 
> profile data ~20% of CPU in the fragment thread is consumed by 
> RawValue::GetHashValueFnv & ExprContext::GetValue
> {code}
>     // hash-partition batch's rows across channels
>     int num_channels = channels_.size();
>     for (int i = 0; i < batch->num_rows(); ++i) {
>       TupleRow* row = batch->GetRow(i);
>       uint32_t hash_val = HashUtil::FNV_SEED;
>       for (int i = 0; i < partition_expr_ctxs_.size(); ++i) {
>         ExprContext* ctx = partition_expr_ctxs_[i];
>         void* partition_val = ctx->GetValue(row);
>         // We can't use the crc hash function here because it does not result
>         // in uncorrelated hashes with different seeds.  Instead we must use
>         // fnv hash.
>         // TODO: fix crc hash/GetHashValue()
>         hash_val =
>             RawValue::GetHashValueFnv(partition_val, ctx->root()->type(), 
> hash_val);
>       }
>       ExprContext::FreeLocalAllocations(partition_expr_ctxs_);
>       RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
>     }
> {code}
> |Function Stack| Effective Time % |
> |Total|100%|
> | clone|99%|
> |  start_thread|99%|
> |   thread_proxy|99%|
> |    boost::detail::thread_data<boost::_bi::bind_t<>::run|99%|
> |     boost::_bi::bind_t<void, void (*)(), ::operator()|99%|
> |      operator()<void (*)(const std::basic_string<|99%|
> |       impala::Thread::SuperviseThread|99%|
> |        boost::function0<void>::operator()|99%|
> |         impala::QueryExecMgr::ExecFInstance|99%|
> |          impala::FragmentInstanceState::Exec|99%|
> |           impala::PlanFragmentExecutor::Exec|99%|
> |            impala::PlanFragmentExecutor::ExecInternal|96%|
> |             impala::DataStreamSender::Send|91%|
> |              impala::DataStreamSender::Channel::AddRow|56%|
> |              impala::RawValue::GetHashValueFnv|11%|
> |              impala::ExprContext::GetValue|11%|
> |              impala::ExprContext::FreeLocalAllocations|6%|
> |              impala::RowBatch::GetRow|1%|
> |              std::vector<impala::ExprContext*, 
> std::allocator<impala::ExprContext*>>::size|1%|
> |              impala::Expr::type|0%|
> |              impala::ExprContext::GetValue|0%|
> |              impala::RuntimeState::CheckQueryState|0%|
> |             impala::HdfsScanNode::GetNext|3%|
> |             impala::RowBatch::Reset|1%|
> |             Status|0%|
> |             ~ScopedTimer|0%|
> |            [Unknown stack frame(s)]|4%|
> Query used in repro 
> {code}
> select /* +straight_join */  count(*) 
> from store_sales a join   /* +shuffle */ 
>      store_returns b on 
> a.ss_item_sk = b.sr_item_sk 
>    where a.ss_ticket_number = b.sr_ticket_number and ss_sold_date_sk between 
> 2450816 and 2451500  and sr_returned_date_sk between 2450816 and 2451500
>    group by a.ss_ticket_number 
>    having count(*) > 9999999999
> {code}
> Explain plan 
> {code}
> +------------------------------------------------------------------------------------------+
> | Explain String                                                              
>              |
> +------------------------------------------------------------------------------------------+
> | Estimated Per-Host Requirements: Memory=3.43GB VCores=3                     
>              |
> |                                                                             
>              |
> | PLAN-ROOT SINK                                                              
>              |
> | |                                                                           
>              |
> | 08:EXCHANGE [UNPARTITIONED]                                                 
>              |
> | |                                                                           
>              |
> | 07:AGGREGATE [FINALIZE]                                                     
>              |
> | |  output: count:merge(*)                                                   
>              |
> | |  group by: a.ss_ticket_number                                             
>              |
> | |  having: count(*) > 9999999999                                            
>              |
> | |                                                                           
>              |
> | 06:EXCHANGE [HASH(a.ss_ticket_number)]                                      
>              |
> | |                                                                           
>              |
> | 03:AGGREGATE [STREAMING]                                                    
>              |
> | |  output: count(*)                                                         
>              |
> | |  group by: a.ss_ticket_number                                             
>              |
> | |                                                                           
>              |
> | 02:HASH JOIN [INNER JOIN, PARTITIONED]                                      
>              |
> | |  hash predicates: a.ss_item_sk = b.sr_item_sk, a.ss_ticket_number = 
> b.sr_ticket_number |
> | |  runtime filters: RF000 <- b.sr_item_sk, RF001 <- b.sr_ticket_number      
>              |
> | |                                                                           
>              |
> | |--05:EXCHANGE [HASH(b.sr_item_sk,b.sr_ticket_number)]                      
>              |
> | |  |                                                                        
>              |
> | |  01:SCAN HDFS [tpcds_3000_parquet.store_returns b]                        
>              |
> | |     partitions=681/2004 files=681 size=13.73GB                            
>              |
> | |                                                                           
>              |
> | 04:EXCHANGE [HASH(a.ss_item_sk,a.ss_ticket_number)]                         
>              |
> | |                                                                           
>              |
> | 00:SCAN HDFS [tpcds_3000_parquet.store_sales a]                             
>              |
> |    partitions=683/1824 files=944 size=140.19GB                              
>              |
> |    runtime filters: RF000 -> a.ss_item_sk, RF001 -> a.ss_ticket_number      
>              |
> +------------------------------------------------------------------------------------------+
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org
For additional commands, e-mail: issues-all-h...@impala.apache.org

Reply via email to