This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 3b20518bd73 [fix](load) fix table sink memory usage counter (#47248)
3b20518bd73 is described below
commit 3b20518bd7360888ca5eba4f9821e9c913978476
Author: TengJianPing <[email protected]>
AuthorDate: Fri Jan 24 05:08:30 2025 +0800
[fix](load) fix table sink memory usage counter (#47248)
---
be/src/olap/memtable_writer.cpp | 4 ++--
be/src/pipeline/exec/olap_table_sink_v2_operator.h | 2 +-
be/src/pipeline/local_exchange/local_exchanger.h | 3 ++-
be/src/vec/core/block.cpp | 17 ++++++++++----
be/src/vec/sink/writer/async_result_writer.cpp | 26 ++++++++++++++++++----
be/src/vec/sink/writer/async_result_writer.h | 10 +++++----
.../sink/writer/iceberg/viceberg_table_writer.cpp | 1 -
.../sink/writer/iceberg/viceberg_table_writer.h | 1 -
be/src/vec/sink/writer/vhive_table_writer.h | 1 -
be/src/vec/sink/writer/vtablet_writer.h | 3 +--
be/src/vec/sink/writer/vtablet_writer_v2.h | 3 +--
11 files changed, 48 insertions(+), 23 deletions(-)
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 6d63890eede..cecad2ceef4 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -158,8 +158,8 @@ Status MemTableWriter::flush_async() {
}
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
- << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
- << ", load id: " << print_id(_req.load_id);
+ << PrettyPrinter::print_bytes(_mem_table->memory_usage())
+ << ", tablet: " << _req.tablet_id << ", load id: " <<
print_id(_req.load_id);
auto s = _flush_memtable_async();
_reset_mem_table();
return s;
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index da5986d09ee..aa030ccf28e 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -68,7 +68,7 @@ public:
void set_low_memory_mode(RuntimeState* state) override {
auto& local_state = get_local_state(state);
- local_state._writer->clear_free_blocks();
+ local_state._writer->set_low_memory_mode();
}
private:
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 5684b418ff2..c1450cb7c69 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -24,7 +24,8 @@ namespace doris {
#include "common/compile_check_begin.h"
namespace vectorized {
template <typename T>
-void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks);
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
+ RuntimeProfile::Counter* memory_used_counter = nullptr);
class PartitionerBase;
} // namespace vectorized
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8584e5df3cd..bed9f7279ed 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -61,15 +61,24 @@ enum CompressionTypePB : int;
namespace doris::vectorized {
template <typename T>
-void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks) {
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
+ RuntimeProfile::Counter* memory_used_counter = nullptr) {
T block;
while (blocks.try_dequeue(block)) {
- // do nothing
+ if (memory_used_counter) {
+ if constexpr (std::is_same_v<T, Block>) {
+ memory_used_counter->update(-block.allocated_bytes());
+ } else {
+ memory_used_counter->update(-block->allocated_bytes());
+ }
+ }
}
}
-template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&);
-template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&);
+template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&,
+ RuntimeProfile::Counter*
memory_used_counter);
+template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&,
+ RuntimeProfile::Counter*
memory_used_counter);
Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {
initialize_index_by_name();
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 28756cfe78f..5a6738191bf 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -58,6 +58,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
_dependency->set_ready();
}
if (rows) {
+ _memory_used_counter->update(add_block->allocated_bytes());
_data_queue.emplace_back(std::move(add_block));
if (!_data_queue_is_available() && !_is_finished()) {
_dependency->block();
@@ -81,10 +82,18 @@ std::unique_ptr<Block>
AsyncResultWriter::_get_block_from_queue() {
if (_data_queue_is_available()) {
_dependency->set_ready();
}
+ _memory_used_counter->update(-block->allocated_bytes());
return block;
}
Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile*
profile) {
+ // Attention!!!
+ // AsyncResultWriter::open is called asynchronously,
+ // so we need to setupt the profile and memory counter here,
+ // or else the counter can be nullptr when AsyncResultWriter::sink is
called.
+ _profile = profile;
+ _memory_used_counter = _profile->get_counter("MemoryUsage");
+
// Should set to false here, to
DCHECK(_finish_dependency);
_finish_dependency->block();
@@ -222,7 +231,12 @@ void AsyncResultWriter::force_close(Status s) {
}
void AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) {
- _free_blocks.enqueue(std::move(b));
+ if (!_low_memory_mode) {
+ auto allocated_bytes = b->allocated_bytes();
+ if (_free_blocks.enqueue(std::move(b))) {
+ _memory_used_counter->update(allocated_bytes);
+ }
+ }
}
std::unique_ptr<Block>
AsyncResultWriter::_get_free_block(doris::vectorized::Block* block,
@@ -230,15 +244,19 @@ std::unique_ptr<Block>
AsyncResultWriter::_get_free_block(doris::vectorized::Blo
std::unique_ptr<Block> b;
if (!_free_blocks.try_dequeue(b)) {
b = block->create_same_struct_block(rows, true);
+ } else {
+ _memory_used_counter->update(-b->allocated_bytes());
}
b->swap(*block);
return b;
}
template <typename T>
-void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks);
-void AsyncResultWriter::clear_free_blocks() {
- clear_blocks(_free_blocks);
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
+ RuntimeProfile::Counter* memory_used_counter = nullptr);
+void AsyncResultWriter::set_low_memory_mode() {
+ _low_memory_mode = true;
+ clear_blocks(_free_blocks, _memory_used_counter);
}
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index a1265655c58..dc6b90a6c27 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -22,13 +22,13 @@
#include <queue> // IWYU pragma: keep
#include "runtime/result_writer.h"
+#include "util/runtime_profile.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris {
class ObjectPool;
class RowDescriptor;
class RuntimeState;
-class RuntimeProfile;
class TDataSink;
class TExpr;
@@ -72,22 +72,22 @@ public:
Status get_writer_status() { return _writer_status.status(); }
- void clear_free_blocks();
+ void set_low_memory_mode();
protected:
Status _projection_block(Block& input_block, Block* output_block);
const VExprContextSPtrs& _vec_output_expr_ctxs;
+ RuntimeProfile* _profile = nullptr; // not owned, set when open
std::unique_ptr<Block> _get_free_block(Block*, size_t rows);
- void _return_free_block(std::unique_ptr<Block>);
-
private:
void process_block(RuntimeState* state, RuntimeProfile* profile);
[[nodiscard]] bool _data_queue_is_available() const { return
_data_queue.size() < QUEUE_SIZE; }
[[nodiscard]] bool _is_finished() const { return !_writer_status.ok() ||
_eos; }
void _set_ready_to_finish();
+ void _return_free_block(std::unique_ptr<Block>);
std::unique_ptr<Block> _get_block_from_queue();
static constexpr auto QUEUE_SIZE = 3;
@@ -97,11 +97,13 @@ private:
// Default value is ok
AtomicStatus _writer_status;
bool _eos = false;
+ std::atomic_bool _low_memory_mode = false;
std::shared_ptr<pipeline::Dependency> _dependency;
std::shared_ptr<pipeline::Dependency> _finish_dependency;
moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
+ RuntimeProfile::Counter* _memory_used_counter = nullptr;
};
} // namespace vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 608afced8d9..81d97593e87 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -47,7 +47,6 @@ Status VIcebergTableWriter::init_properties(ObjectPool* pool)
{
Status VIcebergTableWriter::open(RuntimeState* state, RuntimeProfile* profile)
{
_state = state;
- _profile = profile;
// add all counter
_written_rows_counter = ADD_COUNTER(_profile, "WrittenRows", TUnit::UNIT);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index e53c7020a68..b3389d94880 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -116,7 +116,6 @@ private:
// Currently it is a copy, maybe it is better to use move semantics to
eliminate it.
TDataSink _t_sink;
RuntimeState* _state = nullptr;
- RuntimeProfile* _profile = nullptr;
std::shared_ptr<doris::iceberg::Schema> _schema;
std::unique_ptr<doris::iceberg::PartitionSpec> _partition_spec;
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h
b/be/src/vec/sink/writer/vhive_table_writer.h
index 9361fdbc408..1a2cfd3e8d2 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -71,7 +71,6 @@ private:
// Currently it is a copy, maybe it is better to use move semantics to
eliminate it.
TDataSink _t_sink;
RuntimeState* _state = nullptr;
- RuntimeProfile* _profile = nullptr;
std::vector<int> _partition_columns_input_index;
std::set<size_t> _non_write_columns_indices;
std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>>
_partitions_to_writers;
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 52aa0f6b918..62528a57114 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -683,8 +683,7 @@ private:
VOlapTablePartitionParam* _vpartition = nullptr;
- RuntimeState* _state = nullptr; // not owned, set when open
- RuntimeProfile* _profile = nullptr; // not owned, set when open
+ RuntimeState* _state = nullptr; // not owned, set when open
VRowDistribution _row_distribution;
// reuse to avoid frequent memory allocation and release.
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index b6b6b623698..788af3d1213 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -217,8 +217,7 @@ private:
VOlapTablePartitionParam* _vpartition = nullptr;
- RuntimeState* _state = nullptr; // not owned, set when open
- RuntimeProfile* _profile = nullptr; // not owned, set when open
+ RuntimeState* _state = nullptr; // not owned, set when open
std::unordered_set<int64_t> _opened_partitions;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]