Sailesh Mukil has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 1:

(14 comments)

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110
PS1, Line 110: sent
nit: received


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110
PS1, Line 110: no TransmitData() RPCs will successfully deliver their
             : /// payload.
Why would there be a TransmitData() RPC if EndDataStream() has already been 
sent? Doesn't the sender send it only if it knows all its TransmitData() RPCs 
have been processed?


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@133
PS1, Line 133: /// period expires.
As per Tim's comment above, I would also reference IMPALA-3990 as a TODO here 
for later fixing.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@159
PS1, Line 159: Consider tracking, on the sender, whether a batch has been 
successfully sent or
             : ///   not. That's enough state to realise that a receiver has 
failed (rather than not
             : ///   prepared yet), and the data stream mgr can use that to 
fail an RPC fast, rather than
             : ///   having the closed-stream list.
It would be nice to have a JIRA for this and reference it here.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@353
PS1, Line 353: waiting_senders
This is a little confusing to follow in the .cc file, since when I see 
"waiting_senders", I expect it to be a set of some unique identifiers for a 
Sender ID.

Although this is unique to a specific sender, it would be a little clearer to 
call this 'waiting_senders_ctxs'.

Let me know what you think.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@356
PS1, Line 356: closed_senders
Similarly, we could call this 'closed_senders_ctxs'.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@168
PS1, Line 168:       num_senders_waiting_->Increment(1);
             :       total_senders_waited_->Increment(1);
             :       RecvrId recvr_id = make_pair(fragment_instance_id, 
request->dest_node_id());
             :       auto payload =
             :           make_unique<TransmitDataCtx>(proto_batch, context, 
request, response);
             :       
early_senders_map_[recvr_id].waiting_senders.push_back(move(payload));
I'm wondering if it makes sense to add simple inline functions that encapsulate 
this functionality; for the sake of readability.

Eg: AddEarlyWaitingSender(), AddEarlyClosedSender()


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@213
PS1, Line 213: If no receiver found, but not in the closed stream cache
nit: If no receiver is found, and the receiver is not in the closed stream 
cache as well, we still need...


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@218
PS1, Line 218:       RecvrId recvr_id = make_pair(fragment_instance_id, 
dest_node_id);
             :       auto payload = make_unique<EndDataStreamCtx>(context, 
request, response);
             :       
early_senders_map_[recvr_id].closed_senders.emplace_back(move(payload));
             :       num_senders_waiting_->Increment(1);
             :       total_senders_waited_->Increment(1);
AddEarlyClosedSender() as per comment above, if you agree.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@227
PS1, Line 227:   if (LIKELY(recvr != nullptr)) 
recvr->RemoveSender(request->sender_id());
             :   Status::OK().ToProto(response->mutable_status());
             :   context->RespondSuccess();
This may need some modification based on the recent commit for IMPALA-5199:
https://github.com/apache/incubator-impala/commit/5119ced50c0e0c4001621c9d4da598c187bdb580


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@75
PS1, Line 75: ("new data")
I'm having some trouble understanding what this means. Could you please clarify?


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@271
PS1, Line 271: data_arrival_cv_.notify_all();
Shouldn't this notify be done while holding the lock_ ?


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@285
PS1, Line 285:   while (!blocked_senders_.empty()) {
nit: Add comment: Respond to blocked senders' RPCs


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender.cc@208
PS1, Line 208: %
Should we do a bitwise (cur_batch_idx_ + 1) & 1 instead? Or would the compiler 
take care of that?



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com>
Gerrit-Comment-Date: Mon, 25 Sep 2017 14:49:37 +0000
Gerrit-HasComments: Yes

Reply via email to