This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a95d58b7d1a [Chore](exchange) change
LocalExchangeSharedState:mem_usage signed type to avoid query … (#36682)
a95d58b7d1a is described below
commit a95d58b7d1a5d74813e0fa6adb7e7d4b4db7e845
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 24 10:00:59 2024 +0800
[Chore](exchange) change LocalExchangeSharedState:mem_usage signed type to
avoid query … (#36682)
…blocked when negative mem_usage
## Proposed changes
change LocalExchangeSharedState:mem_usage signed type to avoid query
blocked when negative mem_usage
---
be/src/pipeline/dependency.h | 40 +++-----
be/src/pipeline/local_exchange/local_exchanger.cpp | 114 +++++++++++----------
be/src/pipeline/local_exchange/local_exchanger.h | 1 +
3 files changed, 75 insertions(+), 80 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 0f9c698a82e..5214022db13 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -336,9 +336,8 @@ public:
std::vector<size_t> make_nullable_keys;
struct MemoryRecord {
- MemoryRecord() : used_in_arena(0), used_in_state(0) {}
- int64_t used_in_arena;
- int64_t used_in_state;
+ int64_t used_in_arena {};
+ int64_t used_in_state {};
};
MemoryRecord mem_usage_record;
bool agg_data_created_without_key = false;
@@ -362,11 +361,7 @@ public:
_order_directions(order_directions),
_null_directions(null_directions) {}
- HeapLimitCursor(const HeapLimitCursor& other) noexcept
- : _row_id(other._row_id),
- _limit_columns(other._limit_columns),
- _order_directions(other._order_directions),
- _null_directions(other._null_directions) {}
+ HeapLimitCursor(const HeapLimitCursor& other) = default;
HeapLimitCursor(HeapLimitCursor&& other) noexcept
: _row_id(other._row_id),
@@ -567,11 +562,10 @@ public:
};
struct BlockRowPos {
- BlockRowPos() : block_num(0), row_num(0), pos(0) {}
- int64_t block_num; //the pos at which block
- int64_t row_num; //the pos at which row
- int64_t pos; //pos = all blocks size + row_num
- std::string debug_string() {
+ int64_t block_num {}; //the pos at which block
+ int64_t row_num {}; //the pos at which row
+ int64_t pos {}; //pos = all blocks size + row_num
+ std::string debug_string() const {
std::string res = "\t block_num: ";
res += std::to_string(block_num);
res += "\t row_num: ";
@@ -823,14 +817,9 @@ struct DataDistribution {
DataDistribution(ExchangeType type) : distribution_type(type) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>&
partition_exprs_)
: distribution_type(type), partition_exprs(partition_exprs_) {}
- DataDistribution(const DataDistribution& other)
- : distribution_type(other.distribution_type),
partition_exprs(other.partition_exprs) {}
+ DataDistribution(const DataDistribution& other) = default;
bool need_local_exchange() const { return distribution_type !=
ExchangeType::NOOP; }
- DataDistribution& operator=(const DataDistribution& other) {
- distribution_type = other.distribution_type;
- partition_exprs = other.partition_exprs;
- return *this;
- }
+ DataDistribution& operator=(const DataDistribution& other) = default;
ExchangeType distribution_type;
std::vector<TExpr> partition_exprs;
};
@@ -843,13 +832,14 @@ public:
LocalExchangeSharedState(int num_instances);
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
- std::atomic<size_t> mem_usage = 0;
+ std::atomic<int64_t> mem_usage = 0;
+ // We need to make sure to add mem_usage first and then enqueue, otherwise
sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
void create_source_dependencies(int operator_id, int node_id) {
- for (size_t i = 0; i < source_deps.size(); i++) {
- source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
-
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
- source_deps[i]->set_shared_state(this);
+ for (auto& source_dep : source_deps) {
+ source_dep = std::make_shared<Dependency>(operator_id, node_id,
+
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
+ source_dep->set_shared_state(this);
}
};
void sub_running_sink_operators();
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 51d2c8268e7..27b7fc7e7fd 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -74,20 +74,14 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
return Status::OK();
};
- if (_running_sink_operators == 0) {
- if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
- SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
- block, partitioned_block.first->data_block);
- RETURN_IF_ERROR(get_data(block));
- } else {
- *eos = true;
- }
- } else if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@@ -144,6 +138,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (data_queue[it.second].enqueue({new_block_wrapper,
{row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(it.second);
} else {
+ local_state._shared_state->sub_mem_usage(
+ it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
@@ -162,6 +158,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(i %
_num_sources);
} else {
+ local_state._shared_state->sub_mem_usage(
+ i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(),
+ false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
@@ -181,6 +180,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}})) {
local_state._shared_state->set_ready_to_read(map[i]);
} else {
+ local_state._shared_state->sub_mem_usage(
+ map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
@@ -200,9 +201,12 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
- local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
+ size_t memory_usage = new_block.allocated_bytes();
+ local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
+ } else {
+ local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}
return Status::OK();
@@ -220,25 +224,16 @@ void
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- block->swap(next_block);
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
- block->allocated_bytes());
- if (_free_block_limit == 0 ||
- _free_blocks.size_approx() < _free_block_limit * _num_sources)
{
- _free_blocks.enqueue(std::move(next_block));
- }
- } else {
- *eos = true;
- }
- } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@@ -264,14 +259,11 @@ Status PassToOneExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
return Status::OK();
}
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[0].try_dequeue(next_block)) {
- *block = std::move(next_block);
- } else {
- *eos = true;
- }
- } else if (_data_queue[0].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[0].try_dequeue(next_block)) {
*block = std::move(next_block);
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@@ -287,10 +279,14 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state,
vectorized::Block* in_
}
new_block.swap(*in_block);
DCHECK_LE(local_state._channel_id, _data_queue.size());
- add_mem_usage(local_state, new_block.allocated_bytes());
+
+ size_t memory_usage = new_block.allocated_bytes();
+ add_mem_usage(local_state, memory_usage);
if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
+ } else {
+ sub_mem_usage(local_state, memory_usage);
}
if (eos) {
_queue_deps[local_state._channel_id]->set_always_ready();
@@ -350,6 +346,19 @@ Status LocalMergeSortExchanger::get_block(RuntimeState*
state, vectorized::Block
return Status::OK();
}
+void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSinkLocalState&
local_state,
+ int64_t delta) {
+ const auto channel_id = local_state._channel_id;
+ local_state._shared_state->mem_trackers[channel_id]->release(delta);
+ if (_queues_mem_usege[channel_id].fetch_sub(delta) > _each_queue_limit) {
+ _sink_deps[channel_id]->set_ready();
+ }
+ // if queue empty , block this queue
+ if (_queues_mem_usege[channel_id] == 0) {
+ _queue_deps[channel_id]->block();
+ }
+}
+
void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState&
local_state,
int64_t delta) {
const auto channel_id = local_state._channel_id;
@@ -412,14 +421,11 @@ void
BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- *block = std::move(next_block);
- } else {
- *eos = true;
- }
- } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = std::move(next_block);
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@@ -436,9 +442,12 @@ Status
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
- local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
+ size_t memory_usage = new_block.allocated_bytes();
+ local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
+ } else {
+ local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}
return Status::OK();
@@ -494,9 +503,13 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
vectorized::MutableBlock::create_unique(block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
- local_state._shared_state->add_mem_usage(i,
new_block.allocated_bytes());
+
+ size_t memory_usage = new_block.allocated_bytes();
+ local_state._shared_state->add_mem_usage(i, memory_usage);
if (data_queue[i].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(i);
+ } else {
+ local_state._shared_state->sub_mem_usage(i, memory_usage);
}
}
}
@@ -519,25 +532,16 @@ Status
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- block->swap(next_block);
- if (_free_block_limit == 0 ||
- _free_blocks.size_approx() < _free_block_limit * _num_sources)
{
- _free_blocks.enqueue(std::move(next_block));
- }
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
- block->allocated_bytes());
- } else {
- *eos = true;
- }
- } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 741b86aa8bb..2c4f8f5b785 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -256,6 +256,7 @@ public:
std::vector<Dependency*> local_state_dependency(int channel_id) override;
void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t
delta);
+ void sub_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t
delta);
void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int
channel_id, int64_t delta);
void close(LocalExchangeSourceLocalState& local_state) override {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]