This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e70a3ab8b16 branch-2.1: [fix] (streamload) fixed the issue of data
loss due to concurrency wh… #48948 (#49937)
e70a3ab8b16 is described below
commit e70a3ab8b167afc5f57086263712191990d5690a
Author: Xin Liao <[email protected]>
AuthorDate: Fri Apr 11 15:10:42 2025 +0800
branch-2.1: [fix] (streamload) fixed the issue of data loss due to
concurrency wh… #48948 (#49937)
cherry pick from #48948
Co-authored-by: kang <[email protected]>
Co-authored-by: lik40 <[email protected]>
---
be/src/vec/sink/writer/async_result_writer.cpp | 54 +++++++++++++-------------
1 file changed, 28 insertions(+), 26 deletions(-)
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index aa85a83756f..ec43586526c 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -125,41 +125,43 @@ void AsyncResultWriter::process_block(RuntimeState*
state, RuntimeProfile* profi
}
}
- if (_writer_status.ok()) {
- while (true) {
- ThreadCpuStopWatch cpu_time_stop_watch;
- cpu_time_stop_watch.start();
- Defer defer {[&]() {
- if (state && state->get_query_ctx()) {
-
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
- }
- }};
- if (!_eos && _data_queue.empty() && _writer_status.ok()) {
- std::unique_lock l(_m);
- while (!_eos && _data_queue.empty() && _writer_status.ok()) {
- // Add 1s to check to avoid lost signal
- _cv.wait_for(l, std::chrono::seconds(1));
- }
+ while (_writer_status.ok()) {
+ ThreadCpuStopWatch cpu_time_stop_watch;
+ cpu_time_stop_watch.start();
+ Defer defer {[&]() {
+ if (state && state->get_query_ctx()) {
+
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
+ }
+ }};
+
+ //1) wait scan operator write data
+ {
+ std::unique_lock l(_m);
+ while (!_eos && _data_queue.empty() && _writer_status.ok()) {
+ // Add 1s to check to avoid lost signal
+ _cv.wait_for(l, std::chrono::seconds(1));
}
+ //check if eos or writer error
if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
_data_queue.clear();
break;
}
+ }
- auto block = _get_block_from_queue();
- auto status = write(*block);
- if (!status.ok()) [[unlikely]] {
- std::unique_lock l(_m);
- _writer_status.update(status);
- if (_dependency && _is_finished()) {
- _dependency->set_ready();
- }
- break;
+ //2) get the block from data queue and write to downstream
+ auto block = _get_block_from_queue();
+ auto status = write(*block);
+ if (!status.ok()) [[unlikely]] {
+ std::unique_lock l(_m);
+ _writer_status.update(status);
+ if (_dependency && _is_finished()) {
+ _dependency->set_ready();
}
-
- _return_free_block(std::move(block));
+ break;
}
+
+ _return_free_block(std::move(block));
}
bool need_finish = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]