Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 1: (20 comments) http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110 PS1, Line 110: sent > nit: received Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110 PS1, Line 110: no TransmitData() RPCs will successfully deliver their : /// payload. > Why would there be a TransmitData() RPC if EndDataStream() has already been It's the expectation that the sender will not send any TransmitData() RPC after EndDataStream() RPC. Comments updated. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@129 PS1, Line 129: /// In exceptional circumstances, the data stream manager will garbage-collect the closed Comments added. > sending its last batch is bounded. That seems possible to solve > with sender-side state if the receiver notifies the sender that the > receiver was not present and the sender can infer it was closed > cleanly. Not sure I followed the proposed solution. The sender can track whether it has ever successfully sent a row batch to the receiver. However, in the example above, instance 2 of F3 has never sent a row batch to the receiver before it hits the limit and closes. In which case, it's not clear how the sender can differentiate between an early sender case (i.e. the receiver is still being prepared) vs a closed receiver. It seems a more fool-proof solution is for coordinator to notify all backend nodes about completed/aborted/cancelled queries and that seems to be the absolutely safe point to remove closed stream entries. Alternately, we can use statestore update to broadcast this information and make the maintenance thread in DataStreamMgr remove the receiver entries based on the updates. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@133 PS1, Line 133: /// period expires. > As per Tim's comment above, I would also reference IMPALA-3990 as a TODO he Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@159 PS1, Line 159: Consider tracking, on the sender, whether a batch has been successfully sent or : /// not. That's enough state to realise that a receiver has failed (rather than not : /// prepared yet), and the data stream mgr can use that to fail an RPC fast, rather than : /// having the closed-stream list. > It would be nice to have a JIRA for this and reference it here. Actually, now that I think about it, this idea may not work due to examples such as IMPALA-3990 above. The problem is that the receiver may have been closed (legitimately) even before a particular sender managed to send a batch to it. In which case, it would falsely assume that the receiver has failed. Similarly, if no rows were ever materialized from the sender side, we still need to closed-stream cache to differentiate between the closed receiver vs receiver which is still being prepared. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@353 PS1, Line 353: waiting_senders > This is a little confusing to follow in the .cc file, since when I see "wai Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@356 PS1, Line 356: closed_senders > Similarly, we could call this 'closed_senders_ctxs'. Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@168 PS1, Line 168: num_senders_waiting_->Increment(1); : total_senders_waited_->Increment(1); : RecvrId recvr_id = make_pair(fragment_instance_id, request->dest_node_id()); : auto payload = : make_unique<TransmitDataCtx>(proto_batch, context, request, response); : early_senders_map_[recvr_id].waiting_senders.push_back(move(payload)); > I'm wondering if it makes sense to add simple inline functions that encapsu I don't find it too unreadable being inline but I guess it's less distracting if the logic is encapsulated in a function http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@198 PS1, Line 198: AddData > Isn't the point of the deserialize pool to deserialize the payload early? Deserialization pool's purpose is to avoid executing deserialization in line in the main thread for early or blocked senders. For instances, if there are multiple row batches (from multiple early senders) for a given receiver, the deserialization thread can continue to deserialize the row batches in the queue while the main thread starts consuming the deserialized row batches. While it may be ideal to deserialize the row batches for early senders, we may have a hard time accounting for the memory here as the MemTracker actually resides inside the receiver. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@213 PS1, Line 213: If no receiver found, but not in the closed stream cache > nit: If no receiver is found, and the receiver is not in the closed stream Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@218 PS1, Line 218: RecvrId recvr_id = make_pair(fragment_instance_id, dest_node_id); : auto payload = make_unique<EndDataStreamCtx>(context, request, response); : early_senders_map_[recvr_id].closed_senders.emplace_back(move(payload)); : num_senders_waiting_->Increment(1); : total_senders_waited_->Increment(1); > AddEarlyClosedSender() as per comment above, if you agree. Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@227 PS1, Line 227: if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id()); : Status::OK().ToProto(response->mutable_status()); : context->RespondSuccess(); > This may need some modification based on the recent commit for IMPALA-5199: This may be a bit subtle but this is equivalent to the logic in the non-KRPC implementation. There are 3 cases: 1. the receiver is not found and it has already been unregistered. In which case, there is not much we need to do and it's okay to return Status::OK(). 2. the receiver is not found and it has not yet been unregistered, in which case, it's treated as early senders. In which case, we shouldn't reach here and we should return by line 223 above. The maintenance thread will be responsible for timing out EOS connections. 3. the receiver is found and we return Status::OK() in that case. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@75 PS1, Line 75: ("new data") > I'm having some trouble understanding what this means. Could you please cla Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@165 PS1, Line 165: Either we'll consume a row batch from batch_queue_, or it's empty > Shouldn't there always be something in the batch_queue_ if there's somethin The row batches in the blocked_senders_ queues are enqueued and deserialized in the context of the deserialization threads. In some cases, it's possible for the main thread to have exhausted the batch_queue_ before the deserialization threads get around to inserting entries in blocked_senders_ into batch_queue_. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@166 PS1, Line 166: There is a window > Just to make things clearer, could you specify what there's a window for? The deserialization thread pool is executing asynchronously to the main thread. Therefore, there may be a window in which a blocked senders' context is removed from blocked_senders_ and before it's enqueued into batch_queue_. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@225 PS1, Line 225: > There is a problem here. When we release lock_here, an arbitrary number of We bumped recvr_->num_buffered_bytes_ at line 223 above so other callers of AddBatch() shouldn't be able to go over the limits. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@271 PS1, Line 271: data_arrival_cv_.notify_all(); > Shouldn't this notify be done while holding the lock_ ? why ? http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@284 PS1, Line 284: for (const auto& queue_entry: batch_queue_) delete queue_entry.second; > batch_queue_.clear() ? Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@285 PS1, Line 285: while (!blocked_senders_.empty()) { > nit: Add comment: Respond to blocked senders' RPCs Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-sender.cc@208 PS1, Line 208: % > Should we do a bitwise (cur_batch_idx_ + 1) & 1 instead? Or would the compi The compiler should take care of that. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 1 Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Comment-Date: Thu, 28 Sep 2017 18:28:31 +0000 Gerrit-HasComments: Yes