Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 2: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc@141 PS2, Line 141: while (true) { : // wait until something shows up or we know we're done : while (!is_cancelled_ && batch_queue_.empty() && blocked_senders_.empty() : && num_remaining_senders_ > 0) { : VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id() : << " node=" << recvr_->dest_node_id(); : // Don't count time spent waiting on the sender as active time. : CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER( : received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_, : &is_cancelled_); : data_arrival_cv_.wait(l); : } : : if (is_cancelled_) return Status::CANCELLED; : : if (blocked_senders_.empty() && batch_queue_.empty()) { : DCHECK_EQ(num_remaining_senders_, 0); : return Status::OK(); : } : : received_first_batch_ = true; : : // Either we'll consume a row batch from batch_queue_, or it's empty. In either case, : // take a blocked sender and retry delivering their batch. There is a window between : // which a deferred batch is dequeued from blocked_senders_ queue and when it's : // inserted into batch_queue_. However, a receiver won't respond to the sender until : // the deferred row batch has been inserted. The sender will wait for all in-flight : // RPCs to complete before sending EOS RPC so num_remaining_senders_ should be > 0. : if (!blocked_senders_.empty()) { : recvr_->mgr_->EnqueueRowBatch( : {recvr_->fragment_instance_id(), move(blocked_senders_.front())}); : blocked_senders_.pop(); : } : : if (!batch_queue_.empty()) { : RowBatch* result = batch_queue_.front().second; : recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first); : VLOG_ROW << "fetched #rows=" << result->num_rows(); : current_batch_.reset(result); : *next_batch = current_batch_.get(); : batch_queue_.pop_front(); : return Status::OK(); : } > This loop may lead to live lock in the rare case in which blocked_senders_ Actually, mis-read the thing in the heat of debugging. If both queues are empty, we may return early in line 160 above if num_remaining_senders == 0. So, we shouldn't spin forever. Otherwise, the thread should sleep and wait in line 153. This loop tends to have the unfortunate behavior of popping all entries off blocked_senders_ first before dropping the lock and sleeping on line 153. Although there is a window in which both queues are empty when a row batch is deserialized and moved from blocked_senders_ to batch_queue_, it should be impossible for num_remaining_senders_ to reach 0 in that window. The reason is that the sender of that row batch will not be responded to until after the row batch has been inserted into batch_queue_ (after it has been popped from blocked_senders_). In which case, batch_queue_ will become non-empty first before the remote sender gets a reply. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 2 Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Comment-Date: Fri, 06 Oct 2017 22:03:51 +0000 Gerrit-HasComments: Yes