Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 8: (23 comments) http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h@435 PS8, Line 435: a request 'num_request' requests http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@62 PS8, Line 62: 10000 how was that chosen? do we have a test case that causes this queue to fill up? http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@109 PS8, Line 109: // Transfer the early senders into 'deferred_rpcs_' queue of the corresponding : // sender queue. This makes sure new incoming RPCs won't pass these early senders, : // leading to starvation. this comment seems out of place. this is more an implementation detail of the receiver and handled properly inside ProcessEarlySender(). You could incorporate this in the comment for ProcessEarlySender() (to motivate why it uses the deferred_rpcs_ queue). http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@129 PS8, Line 129: . and start a deserialization task to process it asynchronously. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@130 PS8, Line 130: Transfer this "transfer" is in the oppose direction of how our "Transfer" methods usually go (e.g. src->TransferResourcesOwnership(dest)). Maybe call this ProcessEarlySender() (though I don't love "process" either since it's so vague). http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@197 PS8, Line 197: time cpu time or wall time? http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@200 PS8, Line 200: time same question http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@67 PS8, Line 67: data the resources http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@72 PS8, Line 72: 'payload' RPC state http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@73 PS8, Line 73: deferred_rpcs_ we shouldn't normally refer to private fields in public class comments, but given this is an internal class to the recvr, we can leave this. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@87 PS8, Line 87: void TransferEarlySender(std::unique_ptr<TransmitDataCtx> ctx); same comment as recvr header comment. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@195 PS8, Line 195: while (current_batch_.get() == nullptr) { I don't think we need this loop. see other comments in this function. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@197 PS8, Line 197: !is_cancelled_ && batch_queue_.empty() nit: consider swapping the order of these so that the fast case comes first (!batch_queue_.empty()) but also to match the comment ("or we know we're done" corresponds to the is_cancelled_ and num_remaining_senders_ == 0 cases). http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@201 PS8, Line 201: CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER( : received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_, : &is_cancelled_); there's got to be a cleaner way to do this but ignore for now http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@210 PS8, Line 210: nit: i think we could do without some of the blank lines in this method to make more code fit on a screen http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@211 PS8, Line 211: deferred_rpcs_.empty() && batch_queue_.empty() why not write this condition as: num_renaming_senders_ == 0 then, it's more clear that these three conditions correspond to the loop exit conditions. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@218 PS8, Line 218: !batch_queue_.empty() given that we just checked the other two loop exit conditions, isn't this definitely true? i.e. we don't need this guard it seems. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@227 PS8, Line 227: . now that there might be space or the batch queue might be empty. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@229 PS8, Line 229: . to parallelize the CPU bound deserialization work. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@230 PS8, Line 230: // No point in dequeuing more than number of deserialization threads available. true, though this doesn't quite make sense given that the thread pool is shared across all recvrs. but i guess it's an upper bound. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@239 PS8, Line 239: l.unlock(); once you get rid of the loop, I think you'll be able to eliminate this unlock/lock/unlock and just drop the lock (via scope), which then also makes this easier to reason about. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@389 PS8, Line 389: deferred_rpcs_.pop(); at this point, 'ctx' effectively takes ownership, right? we should add a comment that says that and that says we cannot return with "delete ctx". Even better would be to move the front() into a temp unique_ptr in the function scope. Or you could always pop by moving into a unique_ptr and re-push_front() if you find there is no space. But the main point is it'd be nice to use a smart pointer to ensure we don't return and leak. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@406 PS8, Line 406: ++num_deserialize_tasks_pending_; : COUNTER_ADD(recvr_->num_deferred_batches_, 1); nit: let's reorder these two lines since num_deferred_batches_ is counting the nmber of things that go into deferred_rpcs_ while num_deserialization_tasks_pending_ is counting the number of tasks added to to the DS mgr. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 8 Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Comment-Date: Mon, 06 Nov 2017 21:58:30 +0000 Gerrit-HasComments: Yes