Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 7: (81 comments) http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h File be/src/exec/kudu-util.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h@45 PS6, Line 45: .ok()) > nit: use same mangling between this and KUDU_RETURN_IF_ERROR (prepend seems Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc File be/src/runtime/exec-env.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc@89 PS6, Line 89: "processing threads. If left at default value 0, it will be set to number of CPU " > how are these defaults chosen? I didn't really change these parameters from Henry's patch. There are no good defaults for the service queue size any way (IMPALA-6116) but 1024 seems to be a reasonable bound for untracked memory consumption. The latest PS sets num_svc_threads to match the number of cores on the system. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@97 PS6, Line 97: etion at any one : /// time). The rate of transmission is controlled by the receiver: a sender will only : /// schedule batch transmission when the previous transmissi > i'm confused what the "two" cases are from this comment. Also, I think it' Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@103 PS6, Line 103: batch que > deserialized? Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@104 PS6, Line 104: moved from t > what is this "respectively" referring to? The two cases. Removed as this seems unnecessary. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@249 PS6, Line 249: for Transm > unclear what that means, maybe stale comment? And this should say something Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@258 PS6, Line 258: > isn't that part of "memory pointed to by 'request'"? If so and you want to Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@294 PS6, Line 294: > How about getting rid of this typedef? The code seems easier to understand This data structure has been renamed to DeserializeTask and also changed in the latest patch. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@428 PS6, Line 428: > a sender Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@314 PS3, Line 314: t > Done Oops..missed it. Will update in later patch. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@59 PS6, Line 59: > I don't have a strong preference either way, but it'd be nice to be consist My general guideline is to use lambda if it's one or two liners and use bind for larger functions. I guess there many different opinions on this topic. I try to optimize for readability. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@322 PS6, Line 322: tonicMillis() + STREAM_E > shouldn't be plural Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h@119 PS6, Line 119: row batch i > row batch is deserialized Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@52 PS6, Line 52: total amount of > it's not clear what that means just from reading the comment. It'd be nice Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@113 PS6, Line 113: erialized > how about using unique_ptr since this owns the row batch (until it's transf Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127 PS6, Line 127: // If true, the receiver fragment for this stream got cancelled. > given that a single soft limit is imposed across all sender queues, does it For the non-merging case, there is essentially only one queue. For the merging case, I can see that it's possible for a blocked row batch to a sender queue being passed by either new incoming row batches or blocked row batches to another sender queue. However, once a sender queue becomes empty, one of the blocked row batches will always be added so it won't be starved forever. The passing behavior is dependent on how quickly the sender queue is consumed. The quicker the row batches are consumed for a given queue, the sooner its blocked sender queue gets drained and the higher chance that it passes some slower queues. That seems to be fine as this implicitly exerts back pressure to the producers of the sender queue being consumed at a lower pace than others. I may need to put more thought on other implications for this unfairness across different queues but as we discussed offline, this doesn't necessarily need to be addressed in this patch. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@171 PS6, Line 171: // their batches, allowing the rec > we talked about this in person, but want to note it so I don't forget: this The proposed fix is to notify the deserialization thread to kick off a loop which moves as many items from blocked_senders_ to batch_queue_ as possible. An item shouldn't be dequeued from the blocked_sender_ queue unless it can be added. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@201 PS6, Line 201: if (is_cancelled_) return Status::CANCELLED; missing return; http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@210 PS6, Line 210: if (!batch_queue_.empty()) { missing return; http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@220 PS6, Line 220: // if we haven't notified them alread > why do we have this DCHECK (i.e. why is this condition important here), and This is important as we shouldn't expect a EOS RPC until the preceding TransmitData() RPC has been replied to. So, number of senders should be > 0 at this point even for the blocked senders case. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@231 PS6, Line 231: > update Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@237 PS6, Line 237: > it seems like we should be ORing that with !block_senders_.empty(), or some Fixed in the latest PS. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@320 PS6, Line 320: time to resend the batch that we c This is leaking memory. No point in calling release() here anyway. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@322 PS6, Line 322: // up after the deferred batches to avoid starvation. > doing this in Close() goes against the paradigm that Close() is only about The latest PS moves this to Cancel(). http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h File be/src/runtime/krpc-data-stream-sender.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@57 PS6, Line 57: per_channel_buffer_size' is the soft limit in bytes of the buffering into the : /// per-channel's accumula > still not clear what that means. This isn't really the size of a buffer, is Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@111 PS6, Line 111: serialized row > serialized Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@130 PS6, Line 130: nt channel t > outbound row Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@133 PS6, Line 133: Index of the next OutboundRowBatch to u > Maybe say: Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@67 PS6, Line 67: can > it looks like there's a third interface now: SerializeAndSendBatch() that t We still need the function SerializeAndSendBatch() for the kudu / hash partitioned schemes. I can see how it's possible to hide it as a private function but then the random case needs to also provide an OutboundBatch for serialization. While we can use the outbound_batches_ in the KrpcDataStreamSender (similar to the unpartitioned case), it seems better to use the buffer of the chosen channel as we round-robin through them and this allows more parallelism than the two outbound_batches_ in KrpDataStreamSender. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@81 PS6, Line 81: Teardown > Teardown() Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@82 PS6, Line 82: Teardown > Teardown() Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@123 PS6, Line 123: ment : // instance execution thread. > if the preceding RPC is still in-flight. Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@126 PS6, Line 126: co > or if the preceding RPC failed. Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@135 PS6, Line 135: d tha > frees Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@141 PS6, Line 141: // Return error status if either the last TransmitData() RPC or EOS RPC failed. > Returns error status if... Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@157 PS6, Line 157: > delete. Done. Actually, this is still used in AddRow(). 'buffer_size_' can be removed though. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@166 PS6, Line 166: // The row batch for accumulating rows copied from AddRow(). > is that actually used? Not anymore in the new code. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@176 PS6, Line 176: to keep per-channel bu > the name of this is confusing because it's so similar to current_batch_idx_ Renamed to rpc_in_flight_batch_. I prefer to keep rpc_in_flight_ and rpc_in_flight_batch_ as separate as EOS RPC doesn't really use rpc_in_flight_batch_. Moved rpc_in_flight_batch_ to be under lock_ in class declaration. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@178 PS6, Line 178: "UNPARTITIONED" scheme. > The outbound row batches are double-buffered so that we can serialize the n Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@179 PS6, Line 179: : : // Index into 'outbound_batches_' for the next availa > then you can delete that (better to put the explanation for why we have two Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@181 PS6, Line 181: le OutboundRowBatch to serialize : // into. This is read and written by the main execution thread. It > that looks incorrect. current_batch_idx_ is incremented immediately after w Renamed to next_batch_idx_; http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@191 PS6, Line 191: // 'lock_' needs to be held when accessing the following fields. > At least add the cross refernce: Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@201 PS6, Line 201: > Add a quick comment for proxy_ and rpc_controller_ Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@237 PS6, Line 237: rpc_retry_i > an RPC Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@244 PS6, Line 244: _fn, const > typo Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@250 PS6, Line 250: e only called from a fragment : // executor thread. > it can return CANCELLED Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@258 PS6, Line 258: nder > code Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@257 PS6, Line 257: (e.g. Connection object was shutdown due to network errors) : // or if the parent sender h > what does that mean? when can that happen? If the connection is closed due to network error, all pending RPCs on that connection will be aborted. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@261 PS6, Line 261: call > code Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@264 PS6, Line 264: thread > threads Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@275 PS6, Line 275: error status > needs updating Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@279 PS6, Line 279: remote > threads Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@367 PS6, Line 367: mgr(); > in RetryCb(), this case explicitly becomes MarkDone(CANCELLED), but here he I see. In which case, we may as well skip the check here since we are checking it in RetryCb() anyway. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@369 PS6, Line 369: // the need to manage yet another thread pool. > why do we need to schedule this on the reactor thread (rather than using ou Mostly to avoid the complexity of managing yet another thread pool. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@382 PS6, Line 382: Y(controlle > explicilty set that = Status::OK() to make it explicit that the first two i Sure. I think the default value is Status::OK but yeah we can be more explicit about it. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@409 PS6, Line 409: TransmitData > is that needed? seems like a redundant memset Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@437 PS6, Line 437: // deleted by destructor. > comment why we need that: Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@451 PS6, Line 451: // If the remote receiver is closed already, there is no point in sending anything. > FlushAndSendEos() does this DCHECK before the remote_recvr_closed_. make th Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@552 PS6, Line 552: if (rpc_in_flight_) { : rpc_controller_.Cancel(); > while (rpc_in_flight_) rpc_done_cv_.wait(l); Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@657 PS6, Line 657: } else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 1) { > mentioned above: rather than a third way to do this, how about just making Please see replies above. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@50 PS6, Line 50: RPC outb > I think we should say something about KRPC to at least give that hint. mayb Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@60 PS6, Line 60: sizeof(tuple_of > sizeof(tuple_offsets_[0]) seems clearer and more robust Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@104 PS6, Line 104: OutboundRowBatch that > OutboundRowBatch Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@144 PS6, Line 144: /// Populate a row batch from the serialized row batch header, decompress / copy : /// the tuple's data into a buffer and convert all offsets in 'tuple_ofsets' back : /// into pointers into the tuple data's buf > stale Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@354 PS6, Line 354: /// whether tuple_data is compressed. If an in-flight row is present in this row batch, > we should preserve this comment when removing the thrift variant. So you co Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@424 PS6, Line 424: /// return. There are a total of num_rows * num_tuples_per_row offsets. > nit: i don't think we generally have all these line breaks between paramete Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@426 PS6, Line 426: _d > delete space Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@444 PS6, Line 444: ncomp > delete Actually, the input argument names need to be fixed to match the definition. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@447 PS6, Line 447: > delete Actually, the input argument names need to be fixed to match the definition. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@455 PS6, Line 455: s is the size > input_* Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@531 PS6, Line 531: _scratch_; > OutboundProtoRowBatch Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@537 PS6, Line 537: /// '_start_row_idx' is the starting row index. > this seems like a hack and we could do something simpler, but let's leave i Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.cc File be/src/runtime/row-batch.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.cc@241 PS6, Line 241: string* tuple_data, int64_t* uncompressed_size, bool* is_compressed) { > this comment was probably meant to be deleted? Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc File be/src/util/network-util.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc@41 PS6, Line 41: using std::find; > undo. Bad rebase. Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc@120 PS6, Line 120: return sock.Pa > undo Done http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto File common/protobuf/data_stream_service.proto: http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@29 PS6, Line 29: ance id, > isn't this the id of the instance? The comment in KrpcDataStreamSender is Done http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@38 PS6, Line 38: tuple offsets' > tuple offsets' buffer Done http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@43 PS6, Line 43: the tuple's > tuple data's buffer Done http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@59 PS6, Line 59: // Sender instance id, unique within a fragment. > same Done http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto File common/protobuf/row_batch.proto: http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto@25 PS6, Line 25: Please see TransmitDataRequestPB for details. > delete Done http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto@32 PS6, Line 32: ptional int32 num_tuples_per_ > why is this needed? i don't see it used. The size of it is used, though it Yes, we actually only need the size. Updated in the new patch. It'd be good to at least include it in case there is any mismatch with the receiver's row desc (e.g. DCHECK will fire). Turns out that our thrift implementation has similar problem. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 7 Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Comment-Date: Fri, 03 Nov 2017 15:52:36 +0000 Gerrit-HasComments: Yes