[ https://issues.apache.org/jira/browse/IMPALA-12233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742474#comment-17742474 ]
Gergely Fürnstáhl edited comment on IMPALA-12233 at 7/12/23 3:46 PM: --------------------------------------------------------------------- [~joemcdonnell] I agree, but facing still some trouble with this approach, I don't really know where to call unregister. My best idea is to call it in PHJNode::Close(), after checking if the node is closed and participate with it in the current synchronization cycle. This will work for one cycle (after I fix the case, when all threads unregistered), but could cause some trouble on the next cycle. E.g. 4 threads: Thread 1: PHJNode::Close() -> Unregister() Thread 2: PHJNode::SomeFunction() -> Wait() Thread 3: PHJNode::SomeFunction() -> Wait() Thread 4: PHJNode::SomeFunction() -> Wait() One of 2-3-4 threads will execute the function and all of them will continue with their life, and thread 1 will decrease num_threads to 3 in the barrier and continues to close down PHJNode. But the next cycle it won't work anymore if only some of the threads wants to close the node again: Thread 2: PHJNode::Close() -> no Unregister call happens as the node is closed already Thread 3: PHJNode::SomeFunction() -> Wait() Thread 4: PHJNode::SomeFunction() -> Wait() In this case we will get stuck again, but I am not sure if this can happen in case of PHJNode. Different approach would be to put Unregister before "if (is_closed()) return;", but PHJNode::Close() can be and will be called multiple times in this case, once by StreamingAggregationNode::GetRowsStreaming() and StreamingAggregationNode::Close(), so it would need some other method of thread synchronization, to guarantee 1 unregister per thread. was (Author: JIRAUSER283863): [~joemcdonnell] I agree, but facing still some trouble with this approach, I don't really know where to call unregister. My best idea is to call it in PHJNode::Close(), after checking if the node is closed and participate with it in the current synchronization cycle. This will work for one cycle (after I fix the case, when all threads unregistered), but could cause some trouble on the next cycle. E.g. 4 threads: Thread 1: PHJNode::Close() -> Unregister() Thread 2: PHJNode::SomeFunction() -> Wait() Thread 3: PHJNode::SomeFunction() -> Wait() Thread 4: PHJNode::SomeFunction() -> Wait() One of 2-3-4 threads will execute the function and all of them will continue with their life, and thread 1 will decrease num_threads to 3 in the barrier and continues to close down PHJNode. But the next cycle it won't work anymore if only some of the threads wants to close the node again: Thread 2: PHJNode::Close() -> no Unregister call happens as the node is closed already Thread 3: PHJNode::SomeFunction() -> Wait() Thread 4: PHJNode::SomeFunction() -> Wait() In this case we will get stuck again, but I am not sure if this can happen in case of PHJNode. Different approach would be to put Unregister before "if (is_closed()) return;", but PHJNode::Close() can be and will be called multiple times in this case, once by StreamingAggregationNode::GetRowsStreaming() and StreamingAggregationNode::Close(), so it would be some other method of thread synchronization, to guarantee 1 unregister per thread. > Partitioned hash join with a limit can hang when using mt_dop>0 > --------------------------------------------------------------- > > Key: IMPALA-12233 > URL: https://issues.apache.org/jira/browse/IMPALA-12233 > Project: IMPALA > Issue Type: Bug > Components: Backend > Affects Versions: Impala 4.3.0 > Reporter: Joe McDonnell > Assignee: Gergely Fürnstáhl > Priority: Blocker > > After encountering a hung query on an Impala cluster, we were able to > reproduce it in the Impala developer environment with these steps: > {noformat} > use tpcds; > set mt_dop=2; > select ss_cdemo_sk from store_sales where ss_sold_date_sk = (select > max(ss_sold_date_sk) from store_sales) group by ss_cdemo_sk limit 1;{noformat} > The problem reproduces with limit values up to 183, then at limit 184 and > higher it doesn't reproduce. > Taking stack traces show a thread waiting for a cyclic barrier: > {noformat} > 0 libpthread.so.0!__pthread_cond_wait + 0x216 > 1 > impalad!impala::CyclicBarrier::Wait<impala::PhjBuilder::DoneProbingHashPartitions(const > int64_t*, impala::BufferPool::ClientHandle*, impala::RuntimeProfile*, > std::deque<std::unique_ptr<impala::PhjBuilderPartition> >*, > impala::RowBatch*)::<lambda()> > [condition-variable.h : 49 + 0xc] > 2 impalad!impala::PhjBuilder::DoneProbingHashPartitions(long const*, > impala::BufferPool::ClientHandle*, impala::RuntimeProfile*, > std::deque<std::unique_ptr<impala::PhjBuilderPartition, > std::default_delete<impala::PhjBuilderPartition> >, > std::allocator<std::unique_ptr<impala::PhjBuilderPartition, > std::default_delete<impala::PhjBuilderPartition> > > >*, impala::RowBatch*) > [partitioned-hash-join-builder.cc : 766 + 0x25] > 3 > impalad!impala::PartitionedHashJoinNode::DoneProbing(impala::RuntimeState*, > impala::RowBatch*) [partitioned-hash-join-node.cc : 1189 + 0x28] > 4 impalad!impala::PartitionedHashJoinNode::GetNext(impala::RuntimeState*, > impala::RowBatch*, bool*) [partitioned-hash-join-node.cc : 599 + 0x15] > 5 > impalad!impala::StreamingAggregationNode::GetRowsStreaming(impala::RuntimeState*, > impala::RowBatch*) [streaming-aggregation-node.cc : 115 + 0x14] > 6 impalad!impala::StreamingAggregationNode::GetNext(impala::RuntimeState*, > impala::RowBatch*, bool*) [streaming-aggregation-node.cc : 77 + 0x15] > 7 impalad!impala::FragmentInstanceState::ExecInternal() > [fragment-instance-state.cc : 446 + 0x15] > 8 impalad!impala::FragmentInstanceState::Exec() [fragment-instance-state.cc > : 104 + 0xf] > 9 impalad!impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) > [query-state.cc : 956 + 0xf]{noformat} > Adding some debug logging around locations that go through that cyclic > barrier, we see one Impalad where it is expecting two threads and only one > arrives: > {noformat} > I0621 18:28:19.926551 210363 partitioned-hash-join-builder.cc:766] > 2a4787b28425372d:ac6bd96200000004] DoneProbingHashPartitions: > num_probe_threads_=2 > I0621 18:28:19.927855 210362 streaming-aggregation-node.cc:136] > 2a4787b28425372d:ac6bd96200000003] the number of rows (93) returned from the > streaming aggregation node has exceeded the limit of 1 > I0621 18:28:19.928887 210362 query-state.cc:958] > 2a4787b28425372d:ac6bd96200000003] Instance completed. > instance_id=2a4787b28425372d:ac6bd96200000003 #in-flight=4 status=OK{noformat} > Other instances that don't have a stuck thread see both threads arrive: > {noformat} > I0621 18:28:19.926223 210358 partitioned-hash-join-builder.cc:766] > 2a4787b28425372d:ac6bd96200000005] DoneProbingHashPartitions: > num_probe_threads_=2 > I0621 18:28:19.926326 210359 partitioned-hash-join-builder.cc:766] > 2a4787b28425372d:ac6bd96200000006] DoneProbingHashPartitions: > num_probe_threads_=2{noformat} > So, there must be a codepath that skips going through the cyclic barrier. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org For additional commands, e-mail: issues-all-h...@impala.apache.org