This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e5b0d7a5cd [CTE](eof) Support cte reuse reduce counter by eof status
and pipeline task mem can release (#20056)
e5b0d7a5cd is described below
commit e5b0d7a5cd8c0dfd5492b6e0a2ecc3ef812a7b6e
Author: HappenLee <[email protected]>
AuthorDate: Fri May 26 22:03:29 2023 +0800
[CTE](eof) Support cte reuse reduce counter by eof status and pipeline task
mem can release (#20056)
---
.../exec/multi_cast_data_stream_source.cpp | 6 ++++++
.../pipeline/exec/multi_cast_data_stream_source.h | 2 ++
be/src/pipeline/exec/multi_cast_data_streamer.cpp | 25 ++++++++++++++++++++--
be/src/pipeline/exec/multi_cast_data_streamer.h | 6 ++++--
be/src/pipeline/pipeline_task.cpp | 4 +++-
be/src/vec/sink/vdata_stream_sender.cpp | 5 -----
6 files changed, 38 insertions(+), 10 deletions(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 5334d2a669..9854d63120 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -61,4 +61,10 @@ Status
MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
}
return Status::OK();
}
+
+Status MultiCastDataStreamerSourceOperator::close(doris::RuntimeState* state) {
+ _multi_cast_data_streamer->close_sender(_consumer_id);
+ return OperatorBase::close(state);
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index c44b37ee2e..3198c4a408 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -69,6 +69,8 @@ public:
bool can_read() override;
+ Status close(doris::RuntimeState* state) override;
+
private:
const int _consumer_id;
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 2262fb3930..3929c6ced0 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -48,13 +48,33 @@ void MultiCastDataStreamer::pull(int sender_idx,
doris::vectorized::Block* block
*eos = _eos and pos_to_pull == _multi_cast_blocks.end();
}
-void MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block* block, bool eos) {
+void MultiCastDataStreamer::close_sender(int sender_idx) {
+ std::lock_guard l(_mutex);
+ auto& pos_to_pull = _sender_pos_to_read[sender_idx];
+ while (pos_to_pull != _multi_cast_blocks.end()) {
+ if (pos_to_pull->_used_count == 1) {
+ DCHECK(pos_to_pull == _multi_cast_blocks.begin());
+ _cumulative_mem_size -= pos_to_pull->_mem_size;
+ pos_to_pull++;
+ _multi_cast_blocks.pop_front();
+ } else {
+ pos_to_pull->_used_count--;
+ pos_to_pull++;
+ }
+ }
+ _closed_sender_count++;
+}
+
+Status MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);
auto block_mem_size = block->allocated_bytes();
std::lock_guard l(_mutex);
- int need_process_count = _cast_sender_count - _opened_sender_count;
+ int need_process_count = _cast_sender_count - _closed_sender_count;
+ if (need_process_count == 0) {
+ return Status::EndOfFile("All data streamer is EOF");
+ }
// TODO: if the [queue back block rows + block->rows()] < batch_size,
better
// do merge block. but need check the need_process_count and used_count
whether
// equal
@@ -70,6 +90,7 @@ void MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block*
}
}
_eos = eos;
+ return Status::OK();
}
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 022761bf3d..92c0e24079 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -46,7 +46,9 @@ public:
void pull(int sender_idx, vectorized::Block* block, bool* eos);
- void push(RuntimeState* state, vectorized::Block* block, bool eos);
+ void close_sender(int sender_idx);
+
+ Status push(RuntimeState* state, vectorized::Block* block, bool eos);
// use sink to check can_write, now always true after we support spill to
disk
bool can_write() { return true; }
@@ -73,7 +75,7 @@ private:
std::mutex _mutex;
bool _eos = false;
int _cast_sender_count = 0;
- int _opened_sender_count = 0;
+ int _closed_sender_count = 0;
int64_t _cumulative_mem_size = 0;
RuntimeProfile::Counter* _process_rows;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 41c4e6b549..853cd8ec0d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -218,10 +218,12 @@ Status PipelineTask::execute(bool* eos) {
*eos = _data_state == SourceState::FINISHED;
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
- RETURN_IF_ERROR(_sink->sink(_state, block, _data_state));
+ auto status = _sink->sink(_state, block, _data_state);
+ *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
if (*eos) { // just return, the scheduler will do finish work
break;
}
+ RETURN_IF_ERROR(status);
}
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index a72c86d44d..6b03753c84 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -17,12 +17,10 @@
#include "vec/sink/vdata_stream_sender.h"
-#include <butil/iobuf_inl.h>
#include <fmt/format.h>
#include <fmt/ranges.h> // IWYU pragma: keep
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Metrics_types.h>
-#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <opentelemetry/nostd/shared_ptr.h>
@@ -36,13 +34,10 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
-#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
-#include "util/brpc_client_cache.h"
#include "util/proto_util.h"
#include "util/telemetry/telemetry.h"
#include "vec/common/sip_hash.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]