Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 7:

(81 comments)

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h
File be/src/exec/kudu-util.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h@45
PS6, Line 45: .ok())
> nit: use same mangling between this and KUDU_RETURN_IF_ERROR (prepend seems
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc
File be/src/runtime/exec-env.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc@89
PS6, Line 89:     "processing threads. If left at default value 0, it will be 
set to number of CPU "
> how are these defaults chosen?
I didn't really change these parameters from Henry's patch.

There are no good defaults for the service queue size any way (IMPALA-6116) but 
1024 seems to be a reasonable bound for untracked memory consumption.

The latest PS sets num_svc_threads to match the number of cores on the system.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@97
PS6, Line 97: etion at any one
            : /// time). The rate of transmission is controlled by the 
receiver: a sender will only
            : /// schedule batch transmission when the previous transmissi
> i'm confused what the "two" cases are from this comment.  Also, I think it'
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@103
PS6, Line 103: batch que
> deserialized?
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@104
PS6, Line 104: moved from t
> what is this "respectively" referring to?
The two cases. Removed as this seems unnecessary.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@249
PS6, Line 249:  for Transm
> unclear what that means, maybe stale comment? And this should say something
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@258
PS6, Line 258:
> isn't that part of "memory pointed to by 'request'"? If so and you want to
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@294
PS6, Line 294:
> How about getting rid of this typedef? The code seems easier to understand
This data structure has been renamed to DeserializeTask and also changed in the 
latest patch.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@428
PS6, Line 428:
> a sender
Done


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h
File be/src/runtime/krpc-data-stream-mgr.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@314
PS3, Line 314: t
> Done
Oops..missed it. Will update in later patch.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc
File be/src/runtime/krpc-data-stream-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@59
PS6, Line 59:
> I don't have a strong preference either way, but it'd be nice to be consist
My general guideline is to use lambda if it's one or two liners and use bind 
for larger functions. I guess there many different opinions on this topic. I 
try to optimize for readability.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@322
PS6, Line 322: tonicMillis() + STREAM_E
> shouldn't be plural
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h
File be/src/runtime/krpc-data-stream-recvr.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h@119
PS6, Line 119: row batch i
> row batch is deserialized
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@52
PS6, Line 52: total amount of
> it's not clear what that means just from reading the comment. It'd be nice
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@113
PS6, Line 113: erialized
> how about using unique_ptr since this owns the row batch (until it's transf
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127
PS6, Line 127:   // If true, the receiver fragment for this stream got 
cancelled.
> given that a single soft limit is imposed across all sender queues, does it
For the non-merging case, there is essentially only one queue.

For the merging case, I can see that it's possible for a blocked row batch to a 
sender queue being passed by either new incoming row batches or blocked row 
batches to another sender queue. However, once a sender queue becomes empty, 
one of the blocked row batches will always be added so it won't be starved 
forever. The passing behavior is dependent on how quickly the sender queue is 
consumed. The quicker the row batches are consumed for a given queue, the 
sooner its blocked sender queue gets drained and the higher chance that it 
passes some slower queues. That seems to be fine as this implicitly exerts back 
pressure to the producers of the sender queue being consumed at a lower pace 
than others. I may need to put more thought on other implications for this 
unfairness across different queues but as we discussed offline, this doesn't 
necessarily need to be addressed in this patch.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@171
PS6, Line 171:   // their batches, allowing the rec
> we talked about this in person, but want to note it so I don't forget: this
The proposed fix is to notify the deserialization thread to kick off a loop 
which moves as many items from blocked_senders_ to batch_queue_ as possible. An 
item shouldn't be dequeued from the blocked_sender_ queue unless it can be 
added.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@201
PS6, Line 201:     if (is_cancelled_) return Status::CANCELLED;
missing return;


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@210
PS6, Line 210:     if (!batch_queue_.empty()) {
missing return;


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@220
PS6, Line 220: // if we haven't notified them alread
> why do we have this DCHECK (i.e. why is this condition important here), and
This is important as we shouldn't expect a EOS RPC until the preceding 
TransmitData() RPC has been replied to. So, number of senders should be > 0 at 
this point even for the blocked senders case.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@231
PS6, Line 231:
> update
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@237
PS6, Line 237:
> it seems like we should be ORing that with !block_senders_.empty(), or some
Fixed in the latest PS.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@320
PS6, Line 320:  time to resend the batch that we c
This is leaking memory. No point in calling release() here anyway.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@322
PS6, Line 322:     // up after the deferred batches to avoid starvation.
> doing this in Close() goes against the paradigm that Close() is only about
The latest PS moves this to Cancel().


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h
File be/src/runtime/krpc-data-stream-sender.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@57
PS6, Line 57: per_channel_buffer_size' is the soft limit in bytes of the 
buffering into the
            :   /// per-channel's accumula
> still not clear what that means. This isn't really the size of a buffer, is
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@111
PS6, Line 111: serialized row
> serialized
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@130
PS6, Line 130: nt channel t
> outbound row
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@133
PS6, Line 133: Index of the next OutboundRowBatch to u
> Maybe say:
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@67
PS6, Line 67: can
> it looks like there's a third interface now: SerializeAndSendBatch() that t
We still need the function SerializeAndSendBatch() for the kudu / hash 
partitioned schemes. I can see how it's possible to hide it as a private 
function but then the random case needs to also provide an OutboundBatch for 
serialization. While we can use the outbound_batches_ in the 
KrpcDataStreamSender (similar to the unpartitioned case), it seems better to 
use the buffer of the chosen channel as we round-robin through them and this 
allows more parallelism than the two outbound_batches_ in KrpDataStreamSender.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@81
PS6, Line 81: Teardown
> Teardown()
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@82
PS6, Line 82: Teardown
> Teardown()
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@123
PS6, Line 123: ment
             :   // instance execution thread.
> if the preceding RPC is still in-flight.
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@126
PS6, Line 126: co
> or if the preceding RPC failed.
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@135
PS6, Line 135: d tha
> frees
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@141
PS6, Line 141:   // Return error status if either the last TransmitData() RPC 
or EOS RPC failed.
> Returns error status if...
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@157
PS6, Line 157:
> delete.
Done. Actually, this is still used in AddRow(). 'buffer_size_' can be removed 
though.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@166
PS6, Line 166:   // The row batch for accumulating rows copied from AddRow().
> is that actually used?
Not anymore in the new code.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@176
PS6, Line 176:  to keep per-channel bu
> the name of this is confusing because it's so similar to current_batch_idx_
Renamed to rpc_in_flight_batch_. I prefer to keep rpc_in_flight_ and 
rpc_in_flight_batch_ as separate as EOS RPC doesn't really use 
rpc_in_flight_batch_.

Moved rpc_in_flight_batch_ to be under lock_ in class declaration.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@178
PS6, Line 178: "UNPARTITIONED" scheme.
> The outbound row batches are double-buffered so that we can serialize the n
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@179
PS6, Line 179:
             :
             :   // Index into 'outbound_batches_' for the next availa
> then you can delete that (better to put the explanation for why we have two
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@181
PS6, Line 181: le OutboundRowBatch to serialize
             :   // into. This is read and written by the main execution 
thread. It
> that looks incorrect. current_batch_idx_ is incremented immediately after w
Renamed to next_batch_idx_;


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@191
PS6, Line 191:   // 'lock_' needs to be held when accessing the following 
fields.
> At least add the cross refernce:
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@201
PS6, Line 201:
> Add a quick comment for proxy_ and rpc_controller_
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@237
PS6, Line 237: rpc_retry_i
> an RPC
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@244
PS6, Line 244: _fn, const
> typo
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@250
PS6, Line 250: e only called from a fragment
             :   // executor thread.
> it can return CANCELLED
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@258
PS6, Line 258: nder
> code
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@257
PS6, Line 257:  (e.g. Connection object was shutdown due to network errors)
             :   // or if the parent sender h
> what does that mean? when can that happen?
If the connection is closed due to network error, all pending RPCs on that 
connection will be aborted.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@261
PS6, Line 261:  call
> code
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@264
PS6, Line 264: thread
> threads
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@275
PS6, Line 275: error status
> needs updating
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@279
PS6, Line 279: remote
> threads
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@367
PS6, Line 367: mgr();
> in RetryCb(), this case explicitly becomes MarkDone(CANCELLED), but here he
I see. In which case, we may as well skip the check here since we are checking 
it in RetryCb() anyway.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@369
PS6, Line 369:     // the need to manage yet another thread pool.
> why do we need to schedule this on the reactor thread (rather than using ou
Mostly to avoid the complexity of managing yet another thread pool.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@382
PS6, Line 382: Y(controlle
> explicilty set that = Status::OK() to make it explicit that the first two i
Sure. I think the default value is Status::OK but yeah we can be more explicit 
about it.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@409
PS6, Line 409:   TransmitData
> is that needed? seems like a redundant memset
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@437
PS6, Line 437:   // deleted by destructor.
> comment why we need that:
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@451
PS6, Line 451:   // If the remote receiver is closed already, there is no point 
in sending anything.
> FlushAndSendEos() does this DCHECK before the remote_recvr_closed_. make th
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@552
PS6, Line 552:   if (rpc_in_flight_) {
             :     rpc_controller_.Cancel();
> while (rpc_in_flight_) rpc_done_cv_.wait(l);
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@657
PS6, Line 657:   } else if (partition_type_ == TPartitionType::RANDOM || 
channels_.size() == 1) {
> mentioned above: rather than a third way to do this, how about just making
Please see replies above.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@50
PS6, Line 50: RPC outb
> I think we should say something about KRPC to at least give that hint. mayb
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@60
PS6, Line 60: sizeof(tuple_of
> sizeof(tuple_offsets_[0]) seems clearer and more robust
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@104
PS6, Line 104: OutboundRowBatch that
> OutboundRowBatch
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@144
PS6, Line 144:   /// Populate a row batch from the serialized row batch header, 
decompress / copy
             :   /// the tuple's data into a buffer and convert all offsets in 
'tuple_ofsets' back
             :   /// into pointers into the tuple data's buf
> stale
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@354
PS6, Line 354:   /// whether tuple_data is compressed. If an in-flight row is 
present in this row batch,
> we should preserve this comment when removing the thrift variant. So you co
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@424
PS6, Line 424:   ///                  return. There are a total of num_rows * 
num_tuples_per_row offsets.
> nit: i don't think we generally have all these line breaks between paramete
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@426
PS6, Line 426: _d
> delete space
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@444
PS6, Line 444: ncomp
> delete
Actually, the input argument names need to be fixed to match the definition.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@447
PS6, Line 447:
> delete
Actually, the input argument names need to be fixed to match the definition.


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@455
PS6, Line 455: s is the size
> input_*
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@531
PS6, Line 531: _scratch_;
> OutboundProtoRowBatch
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@537
PS6, Line 537: /// '_start_row_idx' is the starting row index.
> this seems like a hack and we could do something simpler, but let's leave i
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.cc
File be/src/runtime/row-batch.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.cc@241
PS6, Line 241:     string* tuple_data, int64_t* uncompressed_size, bool* 
is_compressed) {
> this comment was probably meant to be deleted?
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc
File be/src/util/network-util.cc:

http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc@41
PS6, Line 41: using std::find;
> undo. Bad rebase.
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc@120
PS6, Line 120: return sock.Pa
> undo
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@29
PS6, Line 29: ance id,
> isn't this the id of the instance?  The comment in KrpcDataStreamSender is
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@38
PS6, Line 38: tuple offsets'
> tuple offsets' buffer
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@43
PS6, Line 43: the tuple's
> tuple data's buffer
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@59
PS6, Line 59:   // Sender instance id, unique within a fragment.
> same
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto
File common/protobuf/row_batch.proto:

http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto@25
PS6, Line 25: Please see TransmitDataRequestPB for details.
> delete
Done


http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto@32
PS6, Line 32: ptional int32 num_tuples_per_
> why is this needed? i don't see it used. The size of it is used, though it
Yes, we actually only need the size. Updated in the new patch. It'd be good to 
at least include it in case there is any mismatch with the receiver's row desc 
(e.g. DCHECK will fire). Turns out that our thrift implementation has similar 
problem.



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 7
Gerrit-Owner: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com>
Gerrit-Reviewer: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com>
Gerrit-Comment-Date: Fri, 03 Nov 2017 15:52:36 +0000
Gerrit-HasComments: Yes

Reply via email to