Dan Hecht has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 3:

(16 comments)

Some more comments, still going though.

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@75
PS3, Line 75: cached
see comment in row-batch.h about this terminology.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@89
PS3, Line 89: // safe to free them once the callback has been invoked.
I think we should add a reference to KUDU-2011 somewhere here like:

Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The 
client has no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() 
should be the right place, except that's currently called too early).  
RpcController::Cancel(), however, ensures that the callback is called only 
after the RPC layer no longer references the sidecar buffer.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@92
PS3, Line 92: query
query? does it mean fragment instance?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@124
PS3, Line 124: Shutdown the RPC thread
is that still accurate?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@139
PS3, Line 139:   int buffer_size_;
that could use a comment.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@141
PS3, Line 141:   const TNetworkAddress address_;
             :   TUniqueId fragment_instance_id_;
             :   PlanNodeId dest_node_id_;
those could be commented together to say they identify the destination. it's a 
little odd that plan node id is prefixed "dest" when the others are not.

it also seems weird that we need both these and the req_ field since shouldn't 
they just be stored there?  Or seems we should get rid of the req_ and just 
generate it when sending.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@159
PS3, Line 159: num_cached_proto_batches_
caps since constant?
also, can this ever be something other than 2 without writing the code? i.e. 
doesn't the code assume this value is 2 in various ways?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@175
PS3, Line 175: proxy
proxy_


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@197
PS3, Line 197:   bool remote_recvr_closed_ = false;
why is that needed now?
also, shouldn't we do something different, at a higher level, in that case 
(like cancel this instance)?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@200
PS3, Line 200:   Status rpc_status_;
I think it would help to associate this with rpc_in_flight_ - move them 
adjacent and say that rpc_status_ is valid only when rpc_in_flight_ is false, 
or something.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@218
PS3, Line 218: 2
if we have the constant, shouldn't that use it?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@334
PS3, Line 334:   auto pred = [this]() -> bool { return !rpc_in_flight_ || 
ShouldTerminate(); };
             :   auto timeout = std::chrono::system_clock::now() + 
milliseconds(50);
             :   while (!rpc_done_cv_.wait_until(*lock, timeout, pred)) {
             :     timeout = system_clock::now() + milliseconds(50);
             :   }
seems simpler to just write:

while (rpc_in_flight_ && !ShouldTerminate()) {
  auto timeout = std::chrono::system_clock::now() + milliseconds(50);
  rpc_done_cv_.wait_until(*lock, timeout);
}

or even better to use wait_for() which takes the relative timeout.
Or should we use our ConditionVariable wrapper? Especially if we want to start 
instrumenting these things better. But if it's work to switch it over, it's 
okay to keep it condition_variable, but let's at least make the code more 
straight forward.


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@52
PS3, Line 52: struct ProtoRowBatch {
I think we should get rid of this structure all together. IMO, the indirection 
just adds confusion.

On the receive side, it seems we can just get the header and sidecars directly 
from the request, which is already threaded through the RPC handler anyway.  
Pulling it into a ProtoRowBatch just makes it unclear where the not yet 
deserialized rowbatch comes from.

On the send side, I think we should just work directly on CachedProtoRowBatch 
(but rename that thing, see below). The indirection through the ProtoRowBatch 
pointers (aka Slice) makes the lifetime and ownership harder to reason about.

I also found the name confusing this only the header is protobuf. The rest is 
KRPC specific stuff.

Any reason we shouldn't eliminate this?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@89
PS3, Line 89: CachedProtoRowBatch
what is "cached" about this?

How about calling this OutboundRowBatch, RpcRowBatch, or SerializedRowBatch?

The first name maybe is best since it can really only be used for outbound, it 
seems.

ProtoRowBatch seems unnecessary in this case too, since we can just create the 
Slice on-the-fly when we want to send the OutboundRowBatch, no?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@431
PS3, Line 431:
             :   /// Overload for testing that allows the test to force the 
deduplication level.
             :   Status Serialize(TRowBatch* output_batch, bool full_dedup);
             :
             :   /// Shared implementation between thrift and protobuf to 
serialize this row batch.
             :   ///
             :   /// 'full_dedup': true if full deduplication is used.
             :   ///
             :   /// 'tuple_offsets': Updated to contain offsets of all tuples 
into 'tuple_data' upon
             :   /// return . There are a total of num_rows * 
num_tuples_per_row offsets. An offset
             :   /// of -1 records a NULL.
             :   ///
             :   /// 'tuple_data': Updated to hold the serialized tuples' data. 
If 'compression_type'
             :   /// is THdfsCompression::LZ4, this is LZ4 compressed.
             :   ///
             :   /// 'uncompressed_size': Updated with the uncompressed size of 
'tuple_data'.
             :   ///
             :   /// 'compression_type': Updated with the compression type 
applied on 'tuple_data'.
             :   /// THdfsCompression::NONE if there is no compression applied.
             :   ///
             :   /// Returns error status if serialization failed. Returns OK 
otherwise.
             :   Status Serialize(bool full_dedup, vector<int32_t>* 
tuple_offsets, string* tuple_data,
             :       int64_t* uncompressed_size, THdfsCompression::type* 
compression_type);
             :
             :   /// Shared implementation between thrift and protobuf to 
deserialize a row batch.
             :   ///
             :   /// 'input_tuple_data': contains pointer and size of tuples' 
data buffer.
             :   /// If 'compression_type' is not THdfsCompression::NONE, tuple 
data is compressed.
             :   ///
             :   /// 'input_tuple_offsets': an int32_t array of tuples; offsets 
into 'input_tuple_data'.
             :   /// Used for populating the tuples in the row batch with 
actual pointers.
             :   ///
             :   /// 'uncompressed_size': the uncompressed size of 
'input_tuple_data' if it's compressed.
             :   ///
             :   /// 'compression_type': If 'input_tuple_data' is compressed, 
it's the compression
             :   /// codec used.
             :   ///
             :   void Deserialize(const kudu::Slice& input_tuple_data,
             :       const kudu::Slice& input_tuple_offsets, int64_t 
uncompressed_size,
             :       THdfsCompression::type compression_type);
let's leave a TODO about cleaning this all up once we can remove the thrift 
implementation. do we have a JIRA to do that (not for milestone 1)?


http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc@64
PS3, Line 64:   ProtoRowBatch batch;
            :   Status status = FromKuduStatus(context->GetInboundSidecar(
            :       request->row_batch_header().tuple_data_sidecar_idx(), 
&batch.tuple_data));
            :   if (status.ok()) {
            :     status = FromKuduStatus(context->GetInboundSidecar(
            :         request->row_batch_header().tuple_offsets_sidecar_idx(), 
&batch.tuple_offsets));
            :   }
            :   if (status.ok()) {
            :     batch.header = request->row_batch_header();
see comment in row-batch.h.  I think we should do this later when we actually 
want to deserialize the row batch.  We have to keep the 'request' around until 
that time anyway, right?



--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 3
Gerrit-Owner: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com>
Gerrit-Reviewer: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com>
Gerrit-Comment-Date: Wed, 25 Oct 2017 17:19:38 +0000
Gerrit-HasComments: Yes

Reply via email to