github-actions[bot] commented on code in PR #43281:
URL: https://github.com/apache/doris/pull/43281#discussion_r1832203057
##########
be/src/service/arrow_flight/arrow_flight_batch_reader.cpp:
##########
@@ -18,52 +18,278 @@
#include "service/arrow_flight/arrow_flight_batch_reader.h"
#include <arrow/status.h>
+#include <arrow/type.h>
+#include <gen_cpp/internal_service.pb.h>
+
+#include <utility>
-#include "arrow/builder.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/result_buffer_mgr.h"
+#include "runtime/thread_context.h"
+#include "service/backend_options.h"
+#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/utils.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+#include "util/string_util.h"
+#include "vec/core/block.h"
+
+namespace doris::flight {
+
+constexpr size_t BRPC_CONTROLLER_TIMEOUT_MS = 60 * 1000;
+
+ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase(
+ const std::shared_ptr<QueryStatement>& statement)
+ : _statement(statement) {}
+
+std::shared_ptr<arrow::Schema> ArrowFlightBatchReaderBase::schema() const {
+ return _schema;
+}
-namespace doris {
-namespace flight {
+arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const
std::string& msg) {
+ std::string status_msg =
+ fmt::format("ArrowFlightBatchReader {}, packet_seq={},
result={}:{}, finistId={}", msg,
+ _packet_seq, _statement->result_addr.hostname,
_statement->result_addr.port,
+ print_id(_statement->query_id));
+ LOG(WARNING) << status_msg;
+ return arrow::Status::Invalid(status_msg);
+}
-std::shared_ptr<arrow::Schema> ArrowFlightBatchReader::schema() const {
- return schema_;
+ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() {
+#ifndef NDEBUG
+ LOG(INFO) << fmt::format(
+ "ArrowFlightBatchReader finished, packet_seq={},
result_addr={}:{}, finistId={}, "
+ "convert_arrow_batch_timer={}, deserialize_block_timer={},
peak_memory_usage={}",
+ _packet_seq, _statement->result_addr.hostname,
_statement->result_addr.port,
+ print_id(_statement->query_id), _convert_arrow_batch_timer,
_deserialize_block_timer,
+ _mem_tracker->peak_consumption());
+#endif
}
-ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement>
statement,
- std::shared_ptr<arrow::Schema>
schema)
- : statement_(std::move(statement)), schema_(std::move(schema)) {}
+ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader(
+ const std::shared_ptr<QueryStatement>& statement,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
+ : ArrowFlightBatchReaderBase(statement) {
+ _schema = schema;
+ _mem_tracker = mem_tracker;
+}
-arrow::Result<std::shared_ptr<ArrowFlightBatchReader>>
ArrowFlightBatchReader::Create(
- const std::shared_ptr<QueryStatement>& statement_) {
+arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>>
ArrowFlightBatchLocalReader::Create(
+ const std::shared_ptr<QueryStatement>& statement) {
+ DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost());
// Make sure that FE send the fragment to BE and creates the
BufferControlBlock before returning ticket
// to the ADBC client, so that the schema and control block can be found.
- auto schema =
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id);
- if (schema == nullptr) {
- ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
- "Client not found arrow flight schema, maybe query has been
canceled, queryid: {}",
- print_id(statement_->query_id))));
+ std::shared_ptr<arrow::Schema> schema;
+ RETURN_ARROW_STATUS_IF_ERROR(
+
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id,
&schema));
+ std::shared_ptr<MemTrackerLimiter> mem_tracker;
+
RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker(
+ statement->query_id, &mem_tracker));
+
+ std::shared_ptr<ArrowFlightBatchLocalReader> result(
+ new ArrowFlightBatchLocalReader(statement, schema, mem_tracker));
+ return result;
+}
+
+arrow::Status
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out)
{
+ // parameter *out not nullptr
+ *out = nullptr;
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ std::shared_ptr<vectorized::Block> result;
+ auto st =
ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id,
&result,
+
_timezone_obj);
+ st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed");
+ ARROW_RETURN_NOT_OK(to_arrow_status(st));
+ if (result == nullptr) {
+ // eof, normal path end
+ return arrow::Status::OK();
+ }
+
+ {
+ // convert one batch
+ SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
+ st = convert_to_arrow_batch(*result, _schema,
arrow::default_memory_pool(), out,
+ _timezone_obj);
+ st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch
failed");
+ ARROW_RETURN_NOT_OK(to_arrow_status(st));
+ }
+
+ _packet_seq++;
+ if (*out != nullptr) {
+ VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " <<
(*out)->num_rows() << ", "
+ << (*out)->num_columns() << ", packet_seq: " <<
_packet_seq;
+ }
+ return arrow::Status::OK();
+}
+
+ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader(
+ const std::shared_ptr<QueryStatement>& statement,
+ const std::shared_ptr<PBackendService_Stub>& stub)
+ : ArrowFlightBatchReaderBase(statement), _brpc_stub(stub),
_block(nullptr) {
+ _mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::QUERY,
+ fmt::format("ArrowFlightBatchRemoteReader#QueryId={}",
print_id(_statement->query_id)));
+}
+
+arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>>
ArrowFlightBatchRemoteReader::Create(
+ const std::shared_ptr<QueryStatement>& statement) {
+ std::shared_ptr<PBackendService_Stub> stub =
+ ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+ statement->result_addr);
+ if (!stub) {
+ std::string msg = fmt::format(
+ "ArrowFlightBatchRemoteReader get rpc stub failed,
result_addr={}:{}, finistId={}",
+ statement->result_addr.hostname, statement->result_addr.port,
+ print_id(statement->query_id));
+ LOG(WARNING) << msg;
+ return arrow::Status::Invalid(msg);
}
- std::shared_ptr<ArrowFlightBatchReader> result(new
ArrowFlightBatchReader(statement_, schema));
+
+ std::shared_ptr<ArrowFlightBatchRemoteReader> result(
+ new ArrowFlightBatchRemoteReader(statement, stub));
+ ARROW_RETURN_NOT_OK(result->init_schema());
return result;
}
-arrow::Status
ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
- // *out not nullptr
+arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool
first_fetch_for_init) {
Review Comment:
warning: function '_fetch_data' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool
first_fetch_for_init) {
^
```
<details>
<summary>Additional context</summary>
**be/src/service/arrow_flight/arrow_flight_batch_reader.cpp:156:** 88 lines
including whitespace and comments (threshold 80)
```cpp
arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool
first_fetch_for_init) {
^
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]