github-actions[bot] commented on code in PR #26650:
URL: https://github.com/apache/doris/pull/26650#discussion_r1387594172
##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -261,11 +261,7 @@ class Channel {
_ch_cur_pb_block = &_ch_pb_block1;
}
- virtual ~Channel() {
- if (_closure != nullptr && _closure->unref()) {
- delete _closure;
- }
- }
+ virtual ~Channel() {}
Review Comment:
warning: use '= default' to define a trivial destructor
[modernize-use-equals-default]
```suggestion
virtual ~Channel() = default;
```
##########
be/src/service/internal_service.cpp:
##########
@@ -1524,37 +1524,34 @@ void
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
return;
}
- PTabletWriteSlaveDoneRequest request;
- request.set_txn_id(txn_id);
- request.set_tablet_id(tablet_id);
- request.set_node_id(node_id);
- request.set_is_succeed(is_succeed);
- RefCountClosure<PTabletWriteSlaveDoneResult>* closure =
- new RefCountClosure<PTabletWriteSlaveDoneResult>();
- closure->ref();
- closure->ref();
- closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec
* 1000);
- closure->cntl.ignore_eovercrowded();
- stub->response_slave_tablet_pull_rowset(&closure->cntl, &request,
&closure->result, closure);
-
- closure->join();
- if (closure->cntl.Failed()) {
+ auto request = std::make_shared<PTabletWriteSlaveDoneRequest>();
+ request->set_txn_id(txn_id);
+ request->set_tablet_id(tablet_id);
+ request->set_node_id(node_id);
+ request->set_is_succeed(is_succeed);
+ auto pull_rowset_callback =
DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared();
+ auto closure = AutoReleaseClosure<
+ PTabletWriteSlaveDoneRequest,
+
DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request,
+
pull_rowset_callback);
+
closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec *
1000);
Review Comment:
warning: 1000 is a magic number; consider replacing it with a named constant
[readability-magic-numbers]
```cpp
closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec *
1000);
^
```
##########
be/src/vec/sink/writer/vtablet_writer.cpp:
##########
@@ -280,21 +280,7 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent,
IndexChannel* index_channel, i
thread_context()->get_thread_id()));
}
-VNodeChannel::~VNodeChannel() {
- for (auto& closure : _open_closures) {
- if (closure != nullptr) {
- if (closure->unref()) {
- delete closure;
- }
- closure = nullptr;
- }
- }
- if (_add_block_closure != nullptr) {
- delete _add_block_closure;
- _add_block_closure = nullptr;
- }
- static_cast<void>(_cur_add_block_request.release_id());
-}
+VNodeChannel::~VNodeChannel() {}
Review Comment:
warning: use '= default' to define a trivial destructor
[modernize-use-equals-default]
```suggestion
VNodeChannel::~VNodeChannel() = default;
```
##########
be/src/vec/sink/writer/vtablet_writer.cpp:
##########
@@ -357,46 +343,47 @@
void VNodeChannel::_open_internal(bool is_incremental) {
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
- PTabletWriterOpenRequest request;
- request.set_allocated_id(&_parent->_load_id);
- request.set_index_id(_index_channel->_index_id);
- request.set_txn_id(_parent->_txn_id);
- request.set_allocated_schema(_parent->_schema->to_protobuf());
+ auto request = std::make_shared<PTabletWriterOpenRequest>();
+ request->set_allocated_id(&_parent->_load_id);
+ request->set_index_id(_index_channel->_index_id);
+ request->set_txn_id(_parent->_txn_id);
+ request->set_allocated_schema(_parent->_schema->to_protobuf());
std::set<int64_t> deduper;
for (auto& tablet : _all_tablets) {
if (deduper.contains(tablet.tablet_id)) {
continue;
}
- auto ptablet = request.add_tablets();
+ auto ptablet = request->add_tablets();
ptablet->set_partition_id(tablet.partition_id);
ptablet->set_tablet_id(tablet.tablet_id);
deduper.insert(tablet.tablet_id);
}
- request.set_num_senders(_parent->_num_senders);
- request.set_need_gen_rollup(false); // Useless but it is a required field
in pb
- request.set_load_mem_limit(_parent->_load_mem_limit);
- request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
- request.set_is_high_priority(_parent->_is_high_priority);
- request.set_sender_ip(BackendOptions::get_localhost());
- request.set_is_vectorized(true);
- request.set_backend_id(_node_id);
- request.set_enable_profile(_state->enable_profile());
- request.set_is_incremental(is_incremental);
-
- auto* open_closure = new RefCountClosure<PTabletWriterOpenResult> {};
- open_closure->ref();
-
- open_closure->ref(); // This ref is for RPC's reference
-
open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec *
1000);
+ request->set_num_senders(_parent->_num_senders);
+ request->set_need_gen_rollup(false); // Useless but it is a required field
in pb
+ request->set_load_mem_limit(_parent->_load_mem_limit);
+ request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
+ request->set_is_high_priority(_parent->_is_high_priority);
+ request->set_sender_ip(BackendOptions::get_localhost());
+ request->set_is_vectorized(true);
+ request->set_backend_id(_node_id);
+ request->set_enable_profile(_state->enable_profile());
+ request->set_is_incremental(is_incremental);
+
+ auto open_callback =
DummyBrpcCallback<PTabletWriterOpenResult>::create_shared();
+ auto open_closure = AutoReleaseClosure<
+ PTabletWriterOpenRequest,
+
DummyBrpcCallback<PTabletWriterOpenResult>>::create_unique(request,
open_callback);
+
open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec
* 1000);
Review Comment:
warning: 1000 is a magic number; consider replacing it with a named constant
[readability-magic-numbers]
```cpp
open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec
* 1000);
^
```
##########
be/src/vec/sink/writer/vtablet_writer.h:
##########
@@ -120,26 +119,21 @@ struct AddBatchCounter {
// It's very error-prone to guarantee the handler capture vars' & this
closure's destruct sequence.
// So using create() to get the closure pointer is recommended. We can delete
the closure ptr before the capture vars destruction.
-// Delete this point is safe, don't worry about RPC callback will run after
ReusableClosure deleted.
+// Delete this point is safe, don't worry about RPC callback will run after
WriteBlockCallback deleted.
// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send,
`clear_in_flight` after rpc failure or callback,
// then next send will start, and it will wait for the rpc callback to
complete when it is destroyed.
template <typename T>
-class ReusableClosure final : public google::protobuf::Closure {
-public:
- ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
- ~ReusableClosure() override {
- // shouldn't delete when Run() is calling or going to be called, wait
for current Run() done.
- join();
- SCOPED_TRACK_MEMORY_TO_UNKNOWN();
- cntl.Reset();
- }
+class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
+ ENABLE_FACTORY_CREATOR(WriteBlockCallback);
- static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+public:
+ WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
Review Comment:
warning: use '= default' to define a trivial default constructor
[modernize-use-equals-default]
```suggestion
WriteBlockCallback() : cid(INVALID_BTHREAD_ID) = default;
```
##########
be/src/vec/sink/writer/vtablet_writer.h:
##########
@@ -120,26 +119,21 @@
// It's very error-prone to guarantee the handler capture vars' & this
closure's destruct sequence.
// So using create() to get the closure pointer is recommended. We can delete
the closure ptr before the capture vars destruction.
-// Delete this point is safe, don't worry about RPC callback will run after
ReusableClosure deleted.
+// Delete this point is safe, don't worry about RPC callback will run after
WriteBlockCallback deleted.
// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send,
`clear_in_flight` after rpc failure or callback,
// then next send will start, and it will wait for the rpc callback to
complete when it is destroyed.
template <typename T>
-class ReusableClosure final : public google::protobuf::Closure {
-public:
- ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
- ~ReusableClosure() override {
- // shouldn't delete when Run() is calling or going to be called, wait
for current Run() done.
- join();
- SCOPED_TRACK_MEMORY_TO_UNKNOWN();
- cntl.Reset();
- }
+class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
+ ENABLE_FACTORY_CREATOR(WriteBlockCallback);
- static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+public:
+ WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
+ virtual ~WriteBlockCallback() override {}
Review Comment:
warning: 'virtual' is redundant since the function is already declared
'override' [modernize-use-override]
```suggestion
~WriteBlockCallback() override {}
```
##########
be/src/vec/sink/writer/vtablet_writer.h:
##########
@@ -120,26 +119,21 @@
// It's very error-prone to guarantee the handler capture vars' & this
closure's destruct sequence.
// So using create() to get the closure pointer is recommended. We can delete
the closure ptr before the capture vars destruction.
-// Delete this point is safe, don't worry about RPC callback will run after
ReusableClosure deleted.
+// Delete this point is safe, don't worry about RPC callback will run after
WriteBlockCallback deleted.
// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send,
`clear_in_flight` after rpc failure or callback,
// then next send will start, and it will wait for the rpc callback to
complete when it is destroyed.
template <typename T>
-class ReusableClosure final : public google::protobuf::Closure {
-public:
- ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
- ~ReusableClosure() override {
- // shouldn't delete when Run() is calling or going to be called, wait
for current Run() done.
- join();
- SCOPED_TRACK_MEMORY_TO_UNKNOWN();
- cntl.Reset();
- }
+class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
+ ENABLE_FACTORY_CREATOR(WriteBlockCallback);
- static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+public:
+ WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
+ virtual ~WriteBlockCallback() override {}
Review Comment:
warning: use '= default' to define a trivial destructor
[modernize-use-equals-default]
```suggestion
virtual ~WriteBlockCallback() override = default;
```
##########
be/src/vec/sink/writer/vtablet_writer.h:
##########
@@ -120,26 +119,21 @@
// It's very error-prone to guarantee the handler capture vars' & this
closure's destruct sequence.
// So using create() to get the closure pointer is recommended. We can delete
the closure ptr before the capture vars destruction.
-// Delete this point is safe, don't worry about RPC callback will run after
ReusableClosure deleted.
+// Delete this point is safe, don't worry about RPC callback will run after
WriteBlockCallback deleted.
// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send,
`clear_in_flight` after rpc failure or callback,
// then next send will start, and it will wait for the rpc callback to
complete when it is destroyed.
template <typename T>
-class ReusableClosure final : public google::protobuf::Closure {
-public:
- ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
- ~ReusableClosure() override {
- // shouldn't delete when Run() is calling or going to be called, wait
for current Run() done.
- join();
- SCOPED_TRACK_MEMORY_TO_UNKNOWN();
- cntl.Reset();
- }
+class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
+ ENABLE_FACTORY_CREATOR(WriteBlockCallback);
- static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+public:
+ WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
+ virtual ~WriteBlockCallback() override {}
void addFailedHandler(const std::function<void(bool)>& fn) {
failed_handler = fn; }
void addSuccessHandler(const std::function<void(const T&, bool)>& fn) {
success_handler = fn; }
- void join() {
+ virtual void join() override {
Review Comment:
warning: 'virtual' is redundant since the function is already declared
'override' [modernize-use-override]
```suggestion
void join() override {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]