Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... Patch Set 3: (16 comments) Some more comments, still going though. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@75 PS3, Line 75: cached see comment in row-batch.h about this terminology. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@89 PS3, Line 89: // safe to free them once the callback has been invoked. I think we should add a reference to KUDU-2011 somewhere here like: Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The client has no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() should be the right place, except that's currently called too early). RpcController::Cancel(), however, ensures that the callback is called only after the RPC layer no longer references the sidecar buffer. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@92 PS3, Line 92: query query? does it mean fragment instance? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@124 PS3, Line 124: Shutdown the RPC thread is that still accurate? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@139 PS3, Line 139: int buffer_size_; that could use a comment. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@141 PS3, Line 141: const TNetworkAddress address_; : TUniqueId fragment_instance_id_; : PlanNodeId dest_node_id_; those could be commented together to say they identify the destination. it's a little odd that plan node id is prefixed "dest" when the others are not. it also seems weird that we need both these and the req_ field since shouldn't they just be stored there? Or seems we should get rid of the req_ and just generate it when sending. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@159 PS3, Line 159: num_cached_proto_batches_ caps since constant? also, can this ever be something other than 2 without writing the code? i.e. doesn't the code assume this value is 2 in various ways? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@175 PS3, Line 175: proxy proxy_ http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@197 PS3, Line 197: bool remote_recvr_closed_ = false; why is that needed now? also, shouldn't we do something different, at a higher level, in that case (like cancel this instance)? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@200 PS3, Line 200: Status rpc_status_; I think it would help to associate this with rpc_in_flight_ - move them adjacent and say that rpc_status_ is valid only when rpc_in_flight_ is false, or something. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@218 PS3, Line 218: 2 if we have the constant, shouldn't that use it? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@334 PS3, Line 334: auto pred = [this]() -> bool { return !rpc_in_flight_ || ShouldTerminate(); }; : auto timeout = std::chrono::system_clock::now() + milliseconds(50); : while (!rpc_done_cv_.wait_until(*lock, timeout, pred)) { : timeout = system_clock::now() + milliseconds(50); : } seems simpler to just write: while (rpc_in_flight_ && !ShouldTerminate()) { auto timeout = std::chrono::system_clock::now() + milliseconds(50); rpc_done_cv_.wait_until(*lock, timeout); } or even better to use wait_for() which takes the relative timeout. Or should we use our ConditionVariable wrapper? Especially if we want to start instrumenting these things better. But if it's work to switch it over, it's okay to keep it condition_variable, but let's at least make the code more straight forward. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@52 PS3, Line 52: struct ProtoRowBatch { I think we should get rid of this structure all together. IMO, the indirection just adds confusion. On the receive side, it seems we can just get the header and sidecars directly from the request, which is already threaded through the RPC handler anyway. Pulling it into a ProtoRowBatch just makes it unclear where the not yet deserialized rowbatch comes from. On the send side, I think we should just work directly on CachedProtoRowBatch (but rename that thing, see below). The indirection through the ProtoRowBatch pointers (aka Slice) makes the lifetime and ownership harder to reason about. I also found the name confusing this only the header is protobuf. The rest is KRPC specific stuff. Any reason we shouldn't eliminate this? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@89 PS3, Line 89: CachedProtoRowBatch what is "cached" about this? How about calling this OutboundRowBatch, RpcRowBatch, or SerializedRowBatch? The first name maybe is best since it can really only be used for outbound, it seems. ProtoRowBatch seems unnecessary in this case too, since we can just create the Slice on-the-fly when we want to send the OutboundRowBatch, no? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@431 PS3, Line 431: : /// Overload for testing that allows the test to force the deduplication level. : Status Serialize(TRowBatch* output_batch, bool full_dedup); : : /// Shared implementation between thrift and protobuf to serialize this row batch. : /// : /// 'full_dedup': true if full deduplication is used. : /// : /// 'tuple_offsets': Updated to contain offsets of all tuples into 'tuple_data' upon : /// return . There are a total of num_rows * num_tuples_per_row offsets. An offset : /// of -1 records a NULL. : /// : /// 'tuple_data': Updated to hold the serialized tuples' data. If 'compression_type' : /// is THdfsCompression::LZ4, this is LZ4 compressed. : /// : /// 'uncompressed_size': Updated with the uncompressed size of 'tuple_data'. : /// : /// 'compression_type': Updated with the compression type applied on 'tuple_data'. : /// THdfsCompression::NONE if there is no compression applied. : /// : /// Returns error status if serialization failed. Returns OK otherwise. : Status Serialize(bool full_dedup, vector<int32_t>* tuple_offsets, string* tuple_data, : int64_t* uncompressed_size, THdfsCompression::type* compression_type); : : /// Shared implementation between thrift and protobuf to deserialize a row batch. : /// : /// 'input_tuple_data': contains pointer and size of tuples' data buffer. : /// If 'compression_type' is not THdfsCompression::NONE, tuple data is compressed. : /// : /// 'input_tuple_offsets': an int32_t array of tuples; offsets into 'input_tuple_data'. : /// Used for populating the tuples in the row batch with actual pointers. : /// : /// 'uncompressed_size': the uncompressed size of 'input_tuple_data' if it's compressed. : /// : /// 'compression_type': If 'input_tuple_data' is compressed, it's the compression : /// codec used. : /// : void Deserialize(const kudu::Slice& input_tuple_data, : const kudu::Slice& input_tuple_offsets, int64_t uncompressed_size, : THdfsCompression::type compression_type); let's leave a TODO about cleaning this all up once we can remove the thrift implementation. do we have a JIRA to do that (not for milestone 1)? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc@64 PS3, Line 64: ProtoRowBatch batch; : Status status = FromKuduStatus(context->GetInboundSidecar( : request->row_batch_header().tuple_data_sidecar_idx(), &batch.tuple_data)); : if (status.ok()) { : status = FromKuduStatus(context->GetInboundSidecar( : request->row_batch_header().tuple_offsets_sidecar_idx(), &batch.tuple_offsets)); : } : if (status.ok()) { : batch.header = request->row_batch_header(); see comment in row-batch.h. I think we should do this later when we actually want to deserialize the row batch. We have to keep the 'request' around until that time anyway, right? -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 3 Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com> Gerrit-Comment-Date: Wed, 25 Oct 2017 17:19:38 +0000 Gerrit-HasComments: Yes