Mostafa Mokhtar created IMPALA-6746: ---------------------------------------
Summary: Reduce the number of comparison for analytical functions with partitioning when incoming data is clustered Key: IMPALA-6746 URL: https://issues.apache.org/jira/browse/IMPALA-6746 Project: IMPALA Issue Type: Improvement Components: Backend Affects Versions: Impala 2.13.0 Reporter: Mostafa Mokhtar Assignee: Tianyi Wang Attachments: percentile query profile 2.txt Checking if the current row belongs to the same partition in ANALYTIC is very expensive, as it does N comparisons where N is number of rows, in cases when the cardinality of the partition column(s) is relatively small the values will be clustered. One optimization as proposed by [~alex.behm] is to check the first and last tuples in the batch and if they match go avoid calling AnalyticEvalNode::PrevRowCompare for the entire batch. For the query attached which is a common pattern the expected speedup is 20-30%. Query {code} select l_commitdate ,avg(l_extendedprice) as avg_perc ,percentile_cont (.25) within group (order by l_extendedprice asc) as perc_25 ,percentile_cont (.5) within group (order by l_extendedprice asc) as perc_50 ,percentile_cont (.75) within group (order by l_extendedprice asc) as perc_75 ,percentile_cont (.90) within group (order by l_extendedprice asc) as perc_90 from lineitem group by l_commitdate order by l_commitdate {code} Plan {code} F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 | Per-Host Resources: mem-estimate=0B mem-reservation=0B PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | 09:MERGING-EXCHANGE [UNPARTITIONED] | order by: l_commitdate ASC | mem-estimate=0B mem-reservation=0B | tuple-ids=5 row-size=66B cardinality=2559 | F02:PLAN FRAGMENT [HASH(l_commitdate)] hosts=1 instances=1 Per-Host Resources: mem-estimate=22.00MB mem-reservation=13.94MB 05:SORT | order by: l_commitdate ASC | mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB | tuple-ids=5 row-size=66B cardinality=2559 | 08:AGGREGATE [FINALIZE] | output: avg:merge(l_extendedprice), _percentile_cont_interpolation:merge(l_extendedprice, `_percentile_row_number_diff_0`), _percentile_cont_interpolation:merge(l_extendedprice, `_percentile_row_number_diff_1`), _percentile_cont_interpolation:merge(l_extendedprice, `_percentile_row_number_diff_2`), _percentile_cont_interpolation:merge(l_extendedprice, `_percentile_row_number_diff_3`) | group by: l_commitdate | mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB | tuple-ids=4 row-size=66B cardinality=2559 | 07:EXCHANGE [HASH(l_commitdate)] | mem-estimate=0B mem-reservation=0B | tuple-ids=3 row-size=66B cardinality=2559 | F01:PLAN FRAGMENT [HASH(l_commitdate)] hosts=1 instances=1 Per-Host Resources: mem-estimate=64.00MB mem-reservation=22.00MB 04:AGGREGATE [STREAMING] | output: avg(l_extendedprice), _percentile_cont_interpolation(l_extendedprice, row_number() - 1 - count(l_extendedprice) - 1 * 0.25), _percentile_cont_interpolation(l_extendedprice, row_number() - 1 - count(l_extendedprice) - 1 * 0.5), _percentile_cont_interpolation(l_extendedprice, row_number() - 1 - count(l_extendedprice) - 1 * 0.75), _percentile_cont_interpolation(l_extendedprice, row_number() - 1 - count(l_extendedprice) - 1 * 0.90) | group by: l_commitdate | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB | tuple-ids=3 row-size=66B cardinality=2559 | 03:ANALYTIC | functions: count(l_extendedprice) | partition by: l_commitdate | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB | tuple-ids=9,7,8 row-size=50B cardinality=59986052 | 02:ANALYTIC | functions: row_number() | partition by: l_commitdate | order by: l_extendedprice ASC | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB | tuple-ids=9,7 row-size=42B cardinality=59986052 | 01:SORT | order by: l_commitdate ASC NULLS FIRST, l_extendedprice ASC NULLS LAST | mem-estimate=46.00MB mem-reservation=12.00MB spill-buffer=2.00MB | tuple-ids=9 row-size=34B cardinality=59986052 | 06:EXCHANGE [HASH(l_commitdate)] | mem-estimate=0B mem-reservation=0B | tuple-ids=0 row-size=34B cardinality=59986052 | F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B 00:SCAN HDFS [tpch_10_parquet.lineitem, RANDOM] partitions=1/1 files=15 size=2.05GB stored statistics: table: rows=59986052 size=2.05GB columns: all extrapolated-rows=disabled mem-estimate=88.00MB mem-reservation=0B tuple-ids=0 row-size=34B cardinality=59986052 {code} Call stack {code} libc.so.6!__memcmp_sse4_1 - memcmp-sse4.S impalad!StringCompare+0x14 - string-value.inline.h:40 impalad!impala::StringValue::Eq+0x23 - string-value.inline.h:62 impalad!impala::StringValue::operator==+0 - string-value.inline.h:66 impalad!impala::Operators::Eq_StringVal_StringVal+0xd - operators-ir.cc:227 impalad!impala::ScalarFnCall::InterpretEval<impala_udf::BooleanVal>+0x597 - scalar-fn-call.cc:485 impalad!impala::ScalarFnCall::GetBooleanVal+0x24 - scalar-fn-call.cc:536 impalad!impala::AndPredicate::GetBooleanVal+0x4d - compound-predicates.cc:36 impalad!impala::OrPredicate::GetBooleanVal+0x4d - compound-predicates.cc:56 impalad!impala::AndPredicate::GetBooleanVal+0x29 - compound-predicates.cc:33 impalad!impala::ScalarExprEvaluator::GetBooleanVal+0x16 - scalar-expr-evaluator.cc:368 impalad!impala::AnalyticEvalNode::PrevRowCompare+0xb - analytic-eval-node.cc:591 impalad!impala::AnalyticEvalNode::ProcessChildBatch+0x227 - analytic-eval-node.cc:644 impalad!impala::AnalyticEvalNode::ProcessChildBatches+0xf5 - analytic-eval-node.cc:604 impalad!impala::AnalyticEvalNode::GetNext+0x269 - analytic-eval-node.cc:786 impalad!impala::AnalyticEvalNode::ProcessChildBatches+0xbf - analytic-eval-node.cc:602 impalad!impala::AnalyticEvalNode::GetNext+0x269 - analytic-eval-node.cc:786 impalad!impala::PartitionedAggregationNode::GetRowsStreaming+0xa6 - partitioned-aggregation-node.cc:478 impalad!impala::PartitionedAggregationNode::GetNext+0x221 - partitioned-aggregation-node.cc:369 impalad!impala::FragmentInstanceState::ExecInternal+0x1b1 - fragment-instance-state.cc:277 impalad!impala::FragmentInstanceState::Exec+0x29e - fragment-instance-state.cc:89 impalad!impala::QueryState::ExecFInstance+0x249 - query-state.cc:394 impalad!boost::function0<void>::operator()+0x1a - function_template.hpp:767 impalad!impala::Thread::SuperviseThread+0x2e4 - thread.cc:356 impalad!operator()<void (*)(const std::basic_string<char>&, const std::basic_string<char>&, boost::function<void()>, const impala::ThreadDebugInfo*, impala::Promise<long int>*), boost::_bi::list0>+0x5b - bind.hpp:525 impalad!boost::_bi::bind_t<void, void (*)(std::string const&, std::string const&, boost::function<void (void)>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::string>, boost::_bi::value<std::string>, boost::_bi::value<boost::function<void (void)>>, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*>>>::operator()+0 - bind_template.hpp:20 impalad!boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::string const&, std::string const&, boost::function<void (void)>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::string>, boost::_bi::value<std::string>, boost::_bi::value<boost::function<void (void)>>, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*>>>>::run+0x1e - thread.hpp:116 impalad!thread_proxy+0xd9 - [Unknown]:[Unknown] libpthread.so.0!start_thread+0xc1 - pthread_create.c:312 libc.so.6!__clone+0x6c - clone.S:111 {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)