Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 1:

(20 comments)

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110
PS1, Line 110: sent
> nit: received
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110
PS1, Line 110: no TransmitData() RPCs will successfully deliver their
             : /// payload.
> Why would there be a TransmitData() RPC if EndDataStream() has already been
It's the expectation that the sender will not send any TransmitData() RPC after 
EndDataStream() RPC. Comments updated.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@129
PS1, Line 129: /// In exceptional circumstances, the data stream manager will 
garbage-collect the closed
Comments added.

 > sending its last batch is bounded. That seems possible to solve
 > with sender-side state if the receiver notifies the sender that the
 > receiver was not present and the sender can infer it was closed
 > cleanly.

Not sure I followed the proposed solution. The sender can track whether it has 
ever successfully sent a row batch to the receiver. However, in the example 
above, instance 2 of F3 has never sent a row batch to the receiver before it 
hits the limit and closes. In which case, it's not clear how the sender can 
differentiate between an early sender case (i.e. the receiver is still being 
prepared) vs a closed receiver.

It seems a more fool-proof solution is for coordinator to notify all backend 
nodes about completed/aborted/cancelled queries and that seems to be the 
absolutely safe point to remove closed stream entries. Alternately, we can use 
statestore update to broadcast this information and make the maintenance thread 
in DataStreamMgr remove the receiver entries based on the updates.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@133
PS1, Line 133: /// period expires.
> As per Tim's comment above, I would also reference IMPALA-3990 as a TODO he
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@159
PS1, Line 159: Consider tracking, on the sender, whether a batch has been 
successfully sent or
             : ///   not. That's enough state to realise that a receiver has 
failed (rather than not
             : ///   prepared yet), and the data stream mgr can use that to 
fail an RPC fast, rather than
             : ///   having the closed-stream list.
> It would be nice to have a JIRA for this and reference it here.
Actually, now that I think about it, this idea may not work due to examples 
such as IMPALA-3990 above. The problem is that the receiver may have been 
closed (legitimately) even before a particular sender managed to send a batch 
to it. In which case, it would falsely assume that the receiver has failed. 
Similarly, if no rows were ever materialized from the sender side, we still 
need to closed-stream cache to differentiate between the closed receiver vs 
receiver which is still being prepared.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@353
PS1, Line 353: waiting_senders
> This is a little confusing to follow in the .cc file, since when I see "wai
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@356
PS1, Line 356: closed_senders
> Similarly, we could call this 'closed_senders_ctxs'.
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@168
PS1, Line 168:       num_senders_waiting_->Increment(1);
             :       total_senders_waited_->Increment(1);
             :       RecvrId recvr_id = make_pair(fragment_instance_id, 
request->dest_node_id());
             :       auto payload =
             :           make_unique<TransmitDataCtx>(proto_batch, context, 
request, response);
             :       
early_senders_map_[recvr_id].waiting_senders.push_back(move(payload));
> I'm wondering if it makes sense to add simple inline functions that encapsu
I don't find it too unreadable being inline but I guess it's less distracting 
if the logic is encapsulated in a function


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@198
PS1, Line 198: AddData
> Isn't the point of the deserialize pool to deserialize the payload early?
Deserialization pool's purpose is to avoid executing deserialization in line in 
the main thread for early or blocked senders. For instances, if there are 
multiple row batches (from multiple early senders) for a given receiver, the 
deserialization thread can continue to deserialize the row batches in the queue 
while the main thread starts consuming the deserialized row batches.

While it may be ideal to deserialize the row batches for early senders, we may 
have a hard time accounting for the memory here as the MemTracker actually 
resides inside the receiver.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@213
PS1, Line 213: If no receiver found, but not in the closed stream cache
> nit: If no receiver is found, and the receiver is not in the closed stream
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@218
PS1, Line 218:       RecvrId recvr_id = make_pair(fragment_instance_id, 
dest_node_id);
             :       auto payload = make_unique<EndDataStreamCtx>(context, 
request, response);
             :       
early_senders_map_[recvr_id].closed_senders.emplace_back(move(payload));
             :       num_senders_waiting_->Increment(1);
             :       total_senders_waited_->Increment(1);
> AddEarlyClosedSender() as per comment above, if you agree.
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@227
PS1, Line 227:   if (LIKELY(recvr != nullptr)) 
recvr->RemoveSender(request->sender_id());
             :   Status::OK().ToProto(response->mutable_status());
             :   context->RespondSuccess();
> This may need some modification based on the recent commit for IMPALA-5199:
This may be a bit subtle but this is equivalent to the logic in the non-KRPC 
implementation. There are 3 cases:

1. the receiver is not found and it has already been unregistered. In which 
case, there is not much we need to do and it's okay to return Status::OK().

2. the receiver is not found and it has not yet been unregistered, in which 
case, it's treated as early senders. In which case, we shouldn't reach here and 
we should return by line 223 above. The maintenance thread will be responsible 
for timing out EOS connections.

3. the receiver is found and we return Status::OK() in that case.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@75
PS1, Line 75: ("new data")
> I'm having some trouble understanding what this means. Could you please cla
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@165
PS1, Line 165: Either we'll consume a row batch from batch_queue_, or it's empty
> Shouldn't there always be something in the batch_queue_ if there's somethin
The row batches in the blocked_senders_ queues are enqueued and deserialized in 
the context of the deserialization threads. In some cases, it's possible for 
the main thread to have exhausted the batch_queue_ before the deserialization 
threads get around to inserting entries in blocked_senders_ into batch_queue_.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@166
PS1, Line 166: There is a window
> Just to make things clearer, could you specify what there's a window for?
The deserialization thread pool is executing asynchronously to the main thread. 
Therefore, there may be a window in which a blocked senders' context is removed 
from blocked_senders_ and before it's enqueued into batch_queue_.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@225
PS1, Line 225:
> There is a problem here. When we release lock_here, an arbitrary number of
We bumped recvr_->num_buffered_bytes_ at line 223 above so other callers of 
AddBatch() shouldn't be able to go over the limits.


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@271
PS1, Line 271: data_arrival_cv_.notify_all();
> Shouldn't this notify be done while holding the lock_ ?
why ?


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@284
PS1, Line 284:   for (const auto& queue_entry: batch_queue_) delete 
queue_entry.second;
> batch_queue_.clear() ?
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@285
PS1, Line 285:   while (!blocked_senders_.empty()) {
> nit: Add comment: Respond to blocked senders' RPCs
Done


http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender.cc@208
PS1, Line 208: %
> Should we do a bitwise (cur_batch_idx_ + 1) & 1 instead? Or would the compi
The compiler should take care of that.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com>
Gerrit-Comment-Date: Thu, 28 Sep 2017 18:28:31 +0000
Gerrit-HasComments: Yes

Reply via email to