yiguolei commented on code in PR #43281:
URL: https://github.com/apache/doris/pull/43281#discussion_r1835327728
##########
be/src/runtime/buffer_control_block.cpp:
##########
@@ -191,56 +269,115 @@ void BufferControlBlock::get_batch(GetResultBatchCtx*
ctx) {
ctx->on_data(result, _packet_num);
_packet_num++;
- _update_dependency();
return;
}
if (_is_close) {
ctx->on_close(_packet_num, _query_statistics.get());
- _update_dependency();
return;
}
// no ready data, push ctx to waiting list
_waiting_rpc.push_back(ctx);
- _update_dependency();
}
-Status
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
result) {
+Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>*
result,
+ cctz::time_zone& timezone_obj) {
std::unique_lock<std::mutex> l(_lock);
+ Defer defer {[&]() { _update_dependency(); }};
if (!_status.ok()) {
return _status;
}
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}
- while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
- _arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
+ while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled &&
!_is_close) {
+ _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
}
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}
- if (!_arrow_flight_batch_queue.empty()) {
- *result = std::move(_arrow_flight_batch_queue.front());
- _arrow_flight_batch_queue.pop_front();
+ if (!_arrow_flight_result_batch_queue.empty()) {
+ *result = std::move(_arrow_flight_result_batch_queue.front());
+ _arrow_flight_result_batch_queue.pop_front();
+ timezone_obj = _timezone_obj;
+
for (auto it : _instance_rows_in_queue.front()) {
_instance_rows[it.first] -= it.second;
}
_instance_rows_in_queue.pop_front();
_packet_num++;
- _update_dependency();
return Status::OK();
}
// normal path end
if (_is_close) {
- _update_dependency();
+#ifndef NDEBUG
+ std::stringstream ss;
+ _profile.pretty_print(&ss);
+ LOG(INFO) << fmt::format(
+ "BufferControlBlock finished, fragment_id={}, is_close={},
is_cancelled={}, "
+ "packet_num={}, peak_memory_usage={}, profile={}",
+ print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+ _mem_tracker->peak_consumption(), ss.str());
+#endif
return Status::OK();
}
return Status::InternalError("Get Arrow Batch Abnormal Ending");
}
+void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ Defer defer {[&]() { _update_dependency(); }};
+ if (!_status.ok()) {
+ ctx->on_failure(_status);
+ return;
+ }
+ if (_is_cancelled) {
+ ctx->on_failure(Status::Cancelled("Cancelled"));
+ return;
+ }
+
+ if (!_arrow_flight_result_batch_queue.empty()) {
+ auto block = _arrow_flight_result_batch_queue.front();
+ _arrow_flight_result_batch_queue.pop_front();
+ for (auto it : _instance_rows_in_queue.front()) {
+ _instance_rows[it.first] -= it.second;
+ }
+ _instance_rows_in_queue.pop_front();
+
+ ctx->on_data(block, _packet_num, _be_exec_version,
_fragement_transmission_compression_type,
+ _timezone, _arrow_schema_field_names,
_serialize_batch_ns_timer,
+ _uncompressed_bytes_counter, _compressed_bytes_counter);
+ _packet_num++;
+ return;
+ }
+
+ // normal path end
+ if (_is_close) {
+ ctx->on_close(_packet_num);
+#ifndef NDEBUG
+ std::stringstream ss;
Review Comment:
使用LOG DEBUG
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]