Sailesh Mukil has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 1: (14 comments) http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110 PS1, Line 110: sent nit: received http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110 PS1, Line 110: no TransmitData() RPCs will successfully deliver their : /// payload. Why would there be a TransmitData() RPC if EndDataStream() has already been sent? Doesn't the sender send it only if it knows all its TransmitData() RPCs have been processed? http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@133 PS1, Line 133: /// period expires. As per Tim's comment above, I would also reference IMPALA-3990 as a TODO here for later fixing. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@159 PS1, Line 159: Consider tracking, on the sender, whether a batch has been successfully sent or : /// not. That's enough state to realise that a receiver has failed (rather than not : /// prepared yet), and the data stream mgr can use that to fail an RPC fast, rather than : /// having the closed-stream list. It would be nice to have a JIRA for this and reference it here. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@353 PS1, Line 353: waiting_senders This is a little confusing to follow in the .cc file, since when I see "waiting_senders", I expect it to be a set of some unique identifiers for a Sender ID. Although this is unique to a specific sender, it would be a little clearer to call this 'waiting_senders_ctxs'. Let me know what you think. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@356 PS1, Line 356: closed_senders Similarly, we could call this 'closed_senders_ctxs'. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@168 PS1, Line 168: num_senders_waiting_->Increment(1); : total_senders_waited_->Increment(1); : RecvrId recvr_id = make_pair(fragment_instance_id, request->dest_node_id()); : auto payload = : make_unique<TransmitDataCtx>(proto_batch, context, request, response); : early_senders_map_[recvr_id].waiting_senders.push_back(move(payload)); I'm wondering if it makes sense to add simple inline functions that encapsulate this functionality; for the sake of readability. Eg: AddEarlyWaitingSender(), AddEarlyClosedSender() http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@213 PS1, Line 213: If no receiver found, but not in the closed stream cache nit: If no receiver is found, and the receiver is not in the closed stream cache as well, we still need... http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@218 PS1, Line 218: RecvrId recvr_id = make_pair(fragment_instance_id, dest_node_id); : auto payload = make_unique<EndDataStreamCtx>(context, request, response); : early_senders_map_[recvr_id].closed_senders.emplace_back(move(payload)); : num_senders_waiting_->Increment(1); : total_senders_waited_->Increment(1); AddEarlyClosedSender() as per comment above, if you agree. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@227 PS1, Line 227: if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id()); : Status::OK().ToProto(response->mutable_status()); : context->RespondSuccess(); This may need some modification based on the recent commit for IMPALA-5199: https://github.com/apache/incubator-impala/commit/5119ced50c0e0c4001621c9d4da598c187bdb580 http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@75 PS1, Line 75: ("new data") I'm having some trouble understanding what this means. Could you please clarify? http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@271 PS1, Line 271: data_arrival_cv_.notify_all(); Shouldn't this notify be done while holding the lock_ ? http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@285 PS1, Line 285: while (!blocked_senders_.empty()) { nit: Add comment: Respond to blocked senders' RPCs http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender.cc@208 PS1, Line 208: % Should we do a bitwise (cur_batch_idx_ + 1) & 1 instead? Or would the compiler take care of that? -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 1 Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Comment-Date: Mon, 25 Sep 2017 14:49:37 +0000 Gerrit-HasComments: Yes