This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b86dd11a7d [fix](pipeline) refactor olap table sink close (#20771)
b86dd11a7d is described below
commit b86dd11a7d7aed5696cfa1eccb5f4af9889f1ae8
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Jul 4 11:27:51 2023 +0800
[fix](pipeline) refactor olap table sink close (#20771)
For pipeline, olap table sink close is divided into three stages,
try_close() --> pending_finish() --> close()
only after all node channels are done or canceled, pending_finish() will
return false, close() will start.
this will avoid block pipeline on close().
In close, check the index channel intolerable failure status after each
node channel failure,
if intolerable failure is true, the close will be terminated in advance,
and all node channels will be canceled to avoid meaningless blocking.
---
be/src/exec/data_sink.h | 5 +
be/src/pipeline/exec/operator.h | 9 +-
be/src/pipeline/exec/scan_operator.cpp | 4 +-
be/src/pipeline/exec/scan_operator.h | 2 +-
be/src/pipeline/pipeline_task.cpp | 3 +-
be/src/vec/exec/scan/vscan_node.cpp | 2 +-
be/src/vec/exec/scan/vscan_node.h | 2 +-
be/src/vec/sink/vtablet_sink.cpp | 272 +++++++++++++++++++++------------
be/src/vec/sink/vtablet_sink.h | 32 +++-
9 files changed, 227 insertions(+), 104 deletions(-)
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 542dc428cc..cf7b774fcd 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -66,6 +66,11 @@ public:
virtual Status send(RuntimeState* state, vectorized::Block* block, bool
eos = false) {
return Status::NotSupported("Not support send block");
}
+
+ virtual void try_close(RuntimeState* state, Status exec_status) {}
+
+ virtual bool is_close_done() { return true; }
+
// Releases all resources that were allocated in prepare()/send().
// Further send() calls are illegal after calling close().
// It must be okay to call this multiple times. Subsequent calls should
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index fcd22eedcb..12a117b4c4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -231,7 +231,7 @@ public:
*/
virtual bool is_pending_finish() const { return false; }
- virtual Status try_close() { return Status::OK(); }
+ virtual Status try_close(RuntimeState* state) { return Status::OK(); }
bool is_closed() const { return _is_closed; }
@@ -284,6 +284,13 @@ public:
return Status::OK();
}
+ Status try_close(RuntimeState* state) override {
+ _sink->try_close(state, state->query_status());
+ return Status::OK();
+ }
+
+ [[nodiscard]] bool is_pending_finish() const override { return
!_sink->is_close_done(); }
+
Status close(RuntimeState* state) override {
if (is_closed()) {
return Status::OK();
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 83e41d93ba..f34461a9fd 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -56,8 +56,8 @@ bool ScanOperator::is_pending_finish() const {
return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
}
-Status ScanOperator::try_close() {
- return _node->try_close();
+Status ScanOperator::try_close(RuntimeState* state) {
+ return _node->try_close(state);
}
bool ScanOperator::runtime_filters_are_ready_or_timeout() {
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index dbcf8f0ed7..850a1ab020 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -50,7 +50,7 @@ public:
std::string debug_string() const override;
- Status try_close() override;
+ Status try_close(RuntimeState* state) override;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index a6dfd238c2..11aeb620fa 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -265,7 +265,8 @@ Status PipelineTask::finalize() {
}
Status PipelineTask::try_close() {
- return _source->try_close();
+ _sink->try_close(_state);
+ return _source->try_close(_state);
}
Status PipelineTask::close() {
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 2af6fc87c5..aaefef32ee 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -334,7 +334,7 @@ void VScanNode::release_resource(RuntimeState* state) {
ExecNode::release_resource(state);
}
-Status VScanNode::try_close() {
+Status VScanNode::try_close(RuntimeState* state) {
if (_scanner_ctx.get()) {
// mark this scanner ctx as should_stop to make sure scanners will not
be scheduled anymore
// TODO: there is a lock in `set_should_stop` may cause some slight
impact
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 112ca47b54..ee0dadefdc 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -154,7 +154,7 @@ public:
Status alloc_resource(RuntimeState* state) override;
void release_resource(RuntimeState* state) override;
- Status try_close();
+ Status try_close(RuntimeState* state);
bool should_run_serial() const {
return _should_run_serial || _state->enable_scan_node_run_serial();
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index f95c2b13c5..e1870e48e8 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -119,6 +119,7 @@ public:
vnode_channel->channel_info(), ss.str()),
-1);
}
+ done = true;
}
void join() { brpc::Join(cntl.call_id()); }
@@ -128,6 +129,7 @@ public:
VNodeChannel* vnode_channel;
IndexChannel* index_channel;
int64_t partition_id;
+ std::atomic<bool> done {false};
};
IndexChannel::~IndexChannel() {}
@@ -552,6 +554,15 @@ void VNodeChannel::open_partition_wait() {
}
}
+bool VNodeChannel::open_partition_finished() const {
+ for (auto& open_partition_closure : _open_partition_closures) {
+ if (!open_partition_closure->done) {
+ return false;
+ }
+ }
+ return true;
+}
+
Status VNodeChannel::add_block(vectorized::Block* block, const Payload*
payload, bool is_append) {
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
if (payload->second.empty()) {
@@ -681,6 +692,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState*
state,
}
if (!_add_block_closure->try_set_in_flight()) {
+ // There is packet in flight, skip.
return _send_finished ? 0 : 1;
}
@@ -858,6 +870,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
}
void VNodeChannel::cancel(const std::string& cancel_msg) {
+ if (_is_closed) {
+ // skip the channels that have been canceled or close_wait.
+ return;
+ }
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
// set _is_closed to true finally
Defer set_closed {[&]() {
@@ -888,6 +904,11 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
request.release_id();
}
+bool VNodeChannel::is_rpc_done() const {
+ return (_add_batches_finished || (_cancelled &&
!_add_block_closure->is_packet_in_flight())) &&
+ open_partition_finished();
+}
+
Status VNodeChannel::close_wait(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
// set _is_closed to true finally
@@ -908,6 +929,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
}
// waiting for finished, it may take a long time, so we couldn't set a
timeout
+ // In pipeline, is_close_done() is false at this time, will not bock.
while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
bthread_usleep(1000);
@@ -925,15 +947,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
return Status::OK();
}
- std::stringstream ss;
- ss << "close wait failed coz rpc error";
- {
- std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
- if (_cancel_msg != "") {
- ss << ". " << _cancel_msg;
- }
- }
- return Status::InternalError(ss.str());
+ return Status::InternalError(get_cancel_msg());
}
void VNodeChannel::_close_check() {
@@ -1121,14 +1135,14 @@ Status VOlapTableSink::open(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
fmt::memory_buffer buf;
- for (auto index_channel : _channels) {
+ for (const auto& index_channel : _channels) {
fmt::format_to(buf, "index id:{}", index_channel->_index_id);
index_channel->for_each_node_channel(
[](const std::shared_ptr<VNodeChannel>& ch) { ch->open(); });
}
VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf);
- for (auto index_channel : _channels) {
+ for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel([&index_channel](
const
std::shared_ptr<VNodeChannel>& ch) {
auto st = ch->open_wait();
@@ -1181,7 +1195,7 @@ void VOlapTableSink::_send_batch_process() {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
while (true) {
int running_channels_num = 0;
- for (auto index_channel : _channels) {
+ for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel([&running_channels_num,
this](const
std::shared_ptr<VNodeChannel>& ch) {
running_channels_num +=
@@ -1201,8 +1215,8 @@ void VOlapTableSink::_send_batch_process() {
size_t VOlapTableSink::get_pending_bytes() const {
size_t mem_consumption = 0;
- for (auto& indexChannel : _channels) {
- mem_consumption += indexChannel->get_pending_bytes();
+ for (const auto& index_channel : _channels) {
+ mem_consumption += index_channel->get_pending_bytes();
}
return mem_consumption;
}
@@ -1453,13 +1467,95 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
return Status::OK();
}
+Status VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
+ Status status, const std::string& err_msg, const
std::shared_ptr<IndexChannel> ich,
+ const std::shared_ptr<VNodeChannel> nch) {
+ LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " <<
err_msg;
+ ich->mark_as_failed(nch->node_id(), nch->host(), err_msg, -1);
+ // cancel the node channel in best effort
+ nch->cancel(err_msg);
+
+ // check if index has intolerable failure
+ Status index_st = ich->check_intolerable_failure();
+ if (!index_st.ok()) {
+ status = index_st;
+ } else if (Status st = ich->check_tablet_received_rows_consistency();
!st.ok()) {
+ status = st;
+ }
+ return status;
+}
+
+void VOlapTableSink::_cancel_all_channel(Status status) {
+ for (const auto& index_channel : _channels) {
+ index_channel->for_each_node_channel([&status](const
std::shared_ptr<VNodeChannel>& ch) {
+ ch->cancel(status.to_string());
+ });
+ }
+ LOG(INFO) << fmt::format(
+ "close olap table sink. load_id={}, txn_id={}, canceled all node
channels due to "
+ "error: {}",
+ print_id(_load_id), _txn_id, status);
+}
+
+void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
+ if (_try_close) {
+ return;
+ }
+ SCOPED_TIMER(_close_timer);
+ Status status = exec_status;
+ if (status.ok()) {
+ // only if status is ok can we call this
_profile->total_time_counter().
+ // if status is not ok, this sink may not be prepared, so that
_profile is null
+ SCOPED_TIMER(_profile->total_time_counter());
+ {
+ for (const auto& index_channel : _channels) {
+ if (!status.ok()) {
+ break;
+ }
+ index_channel->for_each_node_channel(
+ [this, &index_channel, &status](const
std::shared_ptr<VNodeChannel>& ch) {
+ if (!status.ok() || ch->is_closed()) {
+ return;
+ }
+ // only first try close, all node channels will
mark_close()
+ ch->mark_close();
+ if (ch->is_cancelled()) {
+ status =
this->_cancel_channel_and_check_intolerable_failure(
+ status, ch->get_cancel_msg(),
index_channel, ch);
+ }
+ });
+ } // end for index channels
+ }
+ }
+
+ if (!status.ok()) {
+ _cancel_all_channel(status);
+ _close_status = status;
+ _try_close = true;
+ }
+}
+
+bool VOlapTableSink::is_close_done() {
+ bool close_done = true;
+ for (const auto& index_channel : _channels) {
+ index_channel->for_each_node_channel(
+ [&close_done](const std::shared_ptr<VNodeChannel>& ch) {
+ close_done &= ch->is_rpc_done();
+ });
+ }
+ return close_done;
+}
+
Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return _close_status;
}
+ try_close(state, exec_status);
SCOPED_TIMER(_close_timer);
- Status status = exec_status;
- if (status.ok()) {
+ // If _close_status is not ok, all nodes have been canceled in try_close.
+ if (_close_status.ok()) {
+ DCHECK(exec_status.ok());
+ auto status = Status::OK();
// only if status is ok can we call this
_profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that
_profile is null
SCOPED_TIMER(_profile->total_time_counter());
@@ -1472,7 +1568,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
VNodeChannelStat channel_stat;
{
if (config::enable_lazy_open_partition) {
- for (auto index_channel : _channels) {
+ for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel(
[](const std::shared_ptr<VNodeChannel>& ch) {
ch->open_partition_wait();
@@ -1480,30 +1576,27 @@ Status VOlapTableSink::close(RuntimeState* state,
Status exec_status) {
}
}
- for (auto index_channel : _channels) {
- index_channel->for_each_node_channel(
- [](const std::shared_ptr<VNodeChannel>& ch) {
ch->mark_close(); });
- num_node_channels += index_channel->num_node_channels();
- }
-
- for (auto index_channel : _channels) {
+ for (const auto& index_channel : _channels) {
+ if (!status.ok()) {
+ break;
+ }
int64_t add_batch_exec_time = 0;
int64_t wait_exec_time = 0;
index_channel->for_each_node_channel(
- [&index_channel, &state, &node_add_batch_counter_map,
&serialize_batch_ns,
- &channel_stat, &queue_push_lock_ns,
&actual_consume_ns,
- &total_add_batch_exec_time_ns, &add_batch_exec_time,
+ [this, &index_channel, &status, &state,
&node_add_batch_counter_map,
+ &serialize_batch_ns, &channel_stat,
&queue_push_lock_ns,
+ &actual_consume_ns, &total_add_batch_exec_time_ns,
&add_batch_exec_time,
&total_wait_exec_time_ns, &wait_exec_time,
&total_add_batch_num](const
std::shared_ptr<VNodeChannel>& ch) {
+ if (!status.ok() || ch->is_closed()) {
+ return;
+ }
+ // in pipeline, all node channels are done or
canceled, will not block.
+ // no pipeline, close may block waiting.
auto s = ch->close_wait(state);
if (!s.ok()) {
- auto err_msg = s.to_string();
- index_channel->mark_as_failed(ch->node_id(),
ch->host(), err_msg,
- -1);
- // cancel the node channel in best effort
- ch->cancel(err_msg);
- LOG(WARNING) << ch->channel_info()
- << ", close channel failed, err:
" << err_msg;
+ status =
this->_cancel_channel_and_check_intolerable_failure(
+ status, s.to_string(), index_channel,
ch);
}
ch->time_report(&node_add_batch_counter_map,
&serialize_batch_ns,
&channel_stat,
&queue_push_lock_ns, &actual_consume_ns,
@@ -1511,75 +1604,63 @@ Status VOlapTableSink::close(RuntimeState* state,
Status exec_status) {
&total_wait_exec_time_ns,
&wait_exec_time,
&total_add_batch_num);
});
-
+ num_node_channels += index_channel->num_node_channels();
if (add_batch_exec_time > max_add_batch_exec_time_ns) {
max_add_batch_exec_time_ns = add_batch_exec_time;
}
if (wait_exec_time > max_wait_exec_time_ns) {
max_wait_exec_time_ns = wait_exec_time;
}
-
- // check if index has intolerable failure
- Status index_st = index_channel->check_intolerable_failure();
- if (!index_st.ok()) {
- status = index_st;
- } else if (Status st =
index_channel->check_tablet_received_rows_consistency();
- !st.ok()) {
- status = st;
- }
} // end for index channels
}
- // TODO need to be improved
- LOG(INFO) << "total mem_exceeded_block_ns=" <<
channel_stat.mem_exceeded_block_ns
- << ", total queue_push_lock_ns=" << queue_push_lock_ns
- << ", total actual_consume_ns=" << actual_consume_ns
- << ", load id=" << print_id(_load_id);
-
- COUNTER_SET(_input_rows_counter, _number_input_rows);
- COUNTER_SET(_output_rows_counter, _number_output_rows);
- COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
- COUNTER_SET(_send_data_timer, _send_data_ns);
- COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
- COUNTER_SET(_filter_timer, _filter_ns);
- COUNTER_SET(_append_node_channel_timer,
channel_stat.append_node_channel_ns);
- COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
- COUNTER_SET(_wait_mem_limit_timer, channel_stat.mem_exceeded_block_ns);
- COUNTER_SET(_validate_data_timer, _validate_data_ns);
- COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
- COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
- COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns);
- COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
- COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns);
- COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
- COUNTER_SET(_add_batch_number, total_add_batch_num);
- COUNTER_SET(_num_node_channels, num_node_channels);
- // _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
- int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
- state->num_rows_load_unselected();
- state->set_num_rows_load_total(num_rows_load_total);
- state->update_num_rows_load_filtered(_number_filtered_rows);
-
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
-
- // print log of add batch time of all node, for tracing load
performance easily
- std::stringstream ss;
- ss << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
- << ", txn_id=" << _txn_id
- << ", node add batch time(ms)/wait execution time(ms)/close
time(ms)/num: ";
- for (auto const& pair : node_add_batch_counter_map) {
- ss << "{" << pair.first << ":(" <<
(pair.second.add_batch_execution_time_us / 1000)
- << ")(" << (pair.second.add_batch_wait_execution_time_us /
1000) << ")("
- << pair.second.close_wait_time_ms << ")(" <<
pair.second.add_batch_num << ")} ";
- }
- LOG(INFO) << ss.str();
- } else {
- for (auto channel : _channels) {
- channel->for_each_node_channel([&status](const
std::shared_ptr<VNodeChannel>& ch) {
- ch->cancel(status.to_string());
- });
+
+ if (status.ok()) {
+ // TODO need to be improved
+ LOG(INFO) << "total mem_exceeded_block_ns=" <<
channel_stat.mem_exceeded_block_ns
+ << ", total queue_push_lock_ns=" << queue_push_lock_ns
+ << ", total actual_consume_ns=" << actual_consume_ns
+ << ", load id=" << print_id(_load_id);
+
+ COUNTER_SET(_input_rows_counter, _number_input_rows);
+ COUNTER_SET(_output_rows_counter, _number_output_rows);
+ COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
+ COUNTER_SET(_send_data_timer, _send_data_ns);
+ COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
+ COUNTER_SET(_filter_timer, _filter_ns);
+ COUNTER_SET(_append_node_channel_timer,
channel_stat.append_node_channel_ns);
+ COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
+ COUNTER_SET(_wait_mem_limit_timer,
channel_stat.mem_exceeded_block_ns);
+ COUNTER_SET(_validate_data_timer, _validate_data_ns);
+ COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
+ COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
+ COUNTER_SET(_total_add_batch_exec_timer,
total_add_batch_exec_time_ns);
+ COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
+ COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns);
+ COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
+ COUNTER_SET(_add_batch_number, total_add_batch_num);
+ COUNTER_SET(_num_node_channels, num_node_channels);
+ // _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
+ int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
+ state->num_rows_load_unselected();
+ state->set_num_rows_load_total(num_rows_load_total);
+ state->update_num_rows_load_filtered(_number_filtered_rows);
+
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
+
+ // print log of add batch time of all node, for tracing load
performance easily
+ std::stringstream ss;
+ ss << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
+ << ", txn_id=" << _txn_id
+ << ", node add batch time(ms)/wait execution time(ms)/close
time(ms)/num: ";
+ for (auto const& pair : node_add_batch_counter_map) {
+ ss << "{" << pair.first << ":(" <<
(pair.second.add_batch_execution_time_us / 1000)
+ << ")(" << (pair.second.add_batch_wait_execution_time_us /
1000) << ")("
+ << pair.second.close_wait_time_ms << ")(" <<
pair.second.add_batch_num << ")} ";
+ }
+ LOG(INFO) << ss.str();
+ } else {
+ _cancel_all_channel(status);
}
- LOG(INFO) << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
- << ", txn_id=" << _txn_id
- << ", canceled all node channels due to error: " << status;
+ _close_status = status;
}
// Sender join() must put after node channels mark_close/cancel.
@@ -1592,9 +1673,8 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
_send_batch_thread_pool_token->wait();
}
- _close_status = status;
DataSink::close(state, exec_status);
- return status;
+ return _close_status;
}
} // namespace stream_load
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 9c608cda43..7f9dc38f5f 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -113,6 +113,8 @@ struct AddBatchCounter {
// It's very error-prone to guarantee the handler capture vars' & this
closure's destruct sequence.
// So using create() to get the closure pointer is recommended. We can delete
the closure ptr before the capture vars destruction.
// Delete this point is safe, don't worry about RPC callback will run after
ReusableClosure deleted.
+// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send,
`clear_in_flight` after rpc failure or callback,
+// then next send will start, and it will wait for the rpc callback to
complete when it is destroyed.
template <typename T>
class ReusableClosure final : public google::protobuf::Closure {
public:
@@ -233,6 +235,8 @@ public:
void open_partition_wait();
+ bool open_partition_finished() const;
+
Status add_block(vectorized::Block* block, const Payload* payload, bool
is_append = false);
int try_send_and_fetch_status(RuntimeState* state,
@@ -247,6 +251,22 @@ public:
// 2. just cancel()
void mark_close();
+ bool is_rpc_done() const;
+
+ bool is_closed() const { return _is_closed; }
+ bool is_cancelled() const { return _cancelled; }
+ std::string get_cancel_msg() {
+ std::stringstream ss;
+ ss << "close wait failed coz rpc error";
+ {
+ std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
+ if (_cancel_msg != "") {
+ ss << ". " << _cancel_msg;
+ }
+ }
+ return ss.str();
+ }
+
// two ways to stop channel:
// 1. mark_close()->close_wait() PS. close_wait() will block waiting for
the last AddBatch rpc response.
// 2. just cancel()
@@ -466,6 +486,9 @@ public:
Status open(RuntimeState* state) override;
+ void try_close(RuntimeState* state, Status exec_status) override;
+ // if true, all node channels rpc done, can start close().
+ bool is_close_done() override;
Status close(RuntimeState* state, Status close_status) override;
Status send(RuntimeState* state, vectorized::Block* block, bool eos =
false) override;
@@ -501,6 +524,12 @@ private:
void _open_partition(const VOlapTablePartition* partition);
+ Status _cancel_channel_and_check_intolerable_failure(Status status, const
std::string& err_msg,
+ const
std::shared_ptr<IndexChannel> ich,
+ const
std::shared_ptr<VNodeChannel> nch);
+
+ void _cancel_all_channel(Status status);
+
std::shared_ptr<MemTracker> _mem_tracker;
ObjectPool* _pool;
@@ -585,8 +614,9 @@ private:
int64_t _load_channel_timeout_s = 0;
int32_t _send_batch_parallelism = 1;
- // Save the status of close() method
+ // Save the status of try_close() and close() method
Status _close_status;
+ bool _try_close = false;
// User can change this config at runtime, avoid it being modified during
query or loading process.
bool _transfer_large_data_by_brpc = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]