[ https://issues.apache.org/jira/browse/IMPALA-5168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Ho reassigned IMPALA-5168: ---------------------------------- Assignee: Michael Ho > Codegen hash computation in DataStreamSender::Send for partition exchange. > --------------------------------------------------------------------------- > > Key: IMPALA-5168 > URL: https://issues.apache.org/jira/browse/IMPALA-5168 > Project: IMPALA > Issue Type: Improvement > Components: Backend > Affects Versions: Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, > Impala 2.10.0, Impala 2.11.0 > Reporter: Mostafa Mokhtar > Assignee: Michael Ho > Priority: Major > Labels: perfomance > > Hash partition computation for exchange operators can benefit from codegen, > profile data ~20% of CPU in the fragment thread is consumed by > RawValue::GetHashValueFnv & ExprContext::GetValue > {code} > // hash-partition batch's rows across channels > int num_channels = channels_.size(); > for (int i = 0; i < batch->num_rows(); ++i) { > TupleRow* row = batch->GetRow(i); > uint32_t hash_val = HashUtil::FNV_SEED; > for (int i = 0; i < partition_expr_ctxs_.size(); ++i) { > ExprContext* ctx = partition_expr_ctxs_[i]; > void* partition_val = ctx->GetValue(row); > // We can't use the crc hash function here because it does not result > // in uncorrelated hashes with different seeds. Instead we must use > // fnv hash. > // TODO: fix crc hash/GetHashValue() > hash_val = > RawValue::GetHashValueFnv(partition_val, ctx->root()->type(), > hash_val); > } > ExprContext::FreeLocalAllocations(partition_expr_ctxs_); > RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row)); > } > {code} > |Function Stack| Effective Time % | > |Total|100%| > | clone|99%| > | start_thread|99%| > | thread_proxy|99%| > | boost::detail::thread_data<boost::_bi::bind_t<>::run|99%| > | boost::_bi::bind_t<void, void (*)(), ::operator()|99%| > | operator()<void (*)(const std::basic_string<|99%| > | impala::Thread::SuperviseThread|99%| > | boost::function0<void>::operator()|99%| > | impala::QueryExecMgr::ExecFInstance|99%| > | impala::FragmentInstanceState::Exec|99%| > | impala::PlanFragmentExecutor::Exec|99%| > | impala::PlanFragmentExecutor::ExecInternal|96%| > | impala::DataStreamSender::Send|91%| > | impala::DataStreamSender::Channel::AddRow|56%| > | impala::RawValue::GetHashValueFnv|11%| > | impala::ExprContext::GetValue|11%| > | impala::ExprContext::FreeLocalAllocations|6%| > | impala::RowBatch::GetRow|1%| > | std::vector<impala::ExprContext*, > std::allocator<impala::ExprContext*>>::size|1%| > | impala::Expr::type|0%| > | impala::ExprContext::GetValue|0%| > | impala::RuntimeState::CheckQueryState|0%| > | impala::HdfsScanNode::GetNext|3%| > | impala::RowBatch::Reset|1%| > | Status|0%| > | ~ScopedTimer|0%| > | [Unknown stack frame(s)]|4%| > Query used in repro > {code} > select /* +straight_join */ count(*) > from store_sales a join /* +shuffle */ > store_returns b on > a.ss_item_sk = b.sr_item_sk > where a.ss_ticket_number = b.sr_ticket_number and ss_sold_date_sk between > 2450816 and 2451500 and sr_returned_date_sk between 2450816 and 2451500 > group by a.ss_ticket_number > having count(*) > 9999999999 > {code} > Explain plan > {code} > +------------------------------------------------------------------------------------------+ > | Explain String > | > +------------------------------------------------------------------------------------------+ > | Estimated Per-Host Requirements: Memory=3.43GB VCores=3 > | > | > | > | PLAN-ROOT SINK > | > | | > | > | 08:EXCHANGE [UNPARTITIONED] > | > | | > | > | 07:AGGREGATE [FINALIZE] > | > | | output: count:merge(*) > | > | | group by: a.ss_ticket_number > | > | | having: count(*) > 9999999999 > | > | | > | > | 06:EXCHANGE [HASH(a.ss_ticket_number)] > | > | | > | > | 03:AGGREGATE [STREAMING] > | > | | output: count(*) > | > | | group by: a.ss_ticket_number > | > | | > | > | 02:HASH JOIN [INNER JOIN, PARTITIONED] > | > | | hash predicates: a.ss_item_sk = b.sr_item_sk, a.ss_ticket_number = > b.sr_ticket_number | > | | runtime filters: RF000 <- b.sr_item_sk, RF001 <- b.sr_ticket_number > | > | | > | > | |--05:EXCHANGE [HASH(b.sr_item_sk,b.sr_ticket_number)] > | > | | | > | > | | 01:SCAN HDFS [tpcds_3000_parquet.store_returns b] > | > | | partitions=681/2004 files=681 size=13.73GB > | > | | > | > | 04:EXCHANGE [HASH(a.ss_item_sk,a.ss_ticket_number)] > | > | | > | > | 00:SCAN HDFS [tpcds_3000_parquet.store_sales a] > | > | partitions=683/1824 files=944 size=140.19GB > | > | runtime filters: RF000 -> a.ss_item_sk, RF001 -> a.ss_ticket_number > | > +------------------------------------------------------------------------------------------+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org For additional commands, e-mail: issues-all-h...@impala.apache.org