github-actions[bot] commented on code in PR #44469:
URL: https://github.com/apache/doris/pull/44469#discussion_r1883734068
##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -99,29 +116,83 @@ size_t
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
return mem_size;
}
-Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState*
state) {
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+ if (!_shared_state->need_to_spill) {
+ if (_shared_state->inner_shared_state) {
+ auto* inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
+ if (inner_sink_state_) {
+ auto* inner_sink_state =
+
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+ COUNTER_SET(_memory_used_counter,
inner_sink_state->_memory_used_counter->value());
+ }
+ }
+ return;
+ }
+
+ int64_t mem_size = 0;
+ auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+ for (auto& block : partitioned_blocks) {
+ if (block) {
+ mem_size += block->allocated_bytes();
+ }
+ }
+ COUNTER_SET(_memory_used_counter, mem_size);
+}
+
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState*
state, bool eos) {
+ size_t size_to_reserve = 0;
+ auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ if (_shared_state->need_to_spill) {
+ size_to_reserve = p._partition_count *
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+ } else {
+ if (_shared_state->inner_runtime_state) {
+ size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+ _shared_state->inner_runtime_state.get(), eos);
+ }
+ }
+
+ COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
+ return size_to_reserve;
+}
+
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
Review Comment:
warning: function '_revoke_unpartitioned_block' has cognitive complexity of
63 (threshold 50) [readability-function-cognitive-complexity]
```cpp
Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:161:** +1,
including nesting penalty of 0, nesting level increased to 1
```cpp
if (auto* tmp_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state()) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:165:** +1,
including nesting penalty of 0, nesting level increased to 1
```cpp
if (inner_sink_state) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:174:** +1,
including nesting penalty of 0, nesting level increased to 1
```cpp
if (inner_sink_state) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:179:** +1,
including nesting penalty of 0, nesting level increased to 1
```cpp
if (build_block.rows() <= 1) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:182:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
if (spill_context) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:188:** +1,
including nesting penalty of 0, nesting level increased to 1
```cpp
if (build_block.columns() > num_slots) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:193:**
nesting level increased to 1
```cpp
auto spill_func = [build_block = std::move(build_block), state, this]()
mutable {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:201:**
nesting level increased to 2
```cpp
[](std::vector<uint32_t>& indices) {
indices.reserve(reserved_size); });
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:205:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
while (offset < total_rows) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:209:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
for (size_t i = 0; i != build_block.columns(); ++i) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:227:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
for (size_t i = 0; i != sub_block.rows(); ++i) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:231:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
for (uint32_t partition_idx = 0; partition_idx !=
p._partition_count; ++partition_idx) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:237:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
if (UNLIKELY(!partition_block)) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:245:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(partition_block->add_rows(&sub_block,
begin, end));
^
```
**be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:245:** +5,
including nesting penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(partition_block->add_rows(&sub_block,
begin, end));
^
```
**be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:250:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
if (partition_block->rows() >= reserved_size ||
is_last_block) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:250:** +1
```cpp
if (partition_block->rows() >= reserved_size ||
is_last_block) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:252:** +5,
including nesting penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(spilling_stream->spill_block(state,
block, false));
^
```
**be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:252:** +6,
including nesting penalty of 5, nesting level increased to 6
```cpp
RETURN_IF_ERROR(spilling_stream->spill_block(state,
block, false));
^
```
**be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:256:** +1,
nesting level increased to 4
```cpp
} else {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:265:**
nesting level increased to 1
```cpp
auto exception_catch_func = [spill_func]() mutable {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:266:**
nesting level increased to 2
```cpp
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return
spill_func()); }();
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:266:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return
spill_func()); }();
^
```
**be/src/common/exception.h:79:** expanded from macro
'RETURN_IF_CATCH_EXCEPTION'
```cpp
do {
\
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:266:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return
spill_func()); }();
^
```
**be/src/common/exception.h:84:** expanded from macro
'RETURN_IF_CATCH_EXCEPTION'
```cpp
} catch (const doris::Exception& e) {
\
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:266:** +5,
including nesting penalty of 4, nesting level increased to 5
```cpp
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return
spill_func()); }();
^
```
**be/src/common/exception.h:85:** expanded from macro
'RETURN_IF_CATCH_EXCEPTION'
```cpp
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
\
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:277:** +1,
including nesting penalty of 0, nesting level increased to 1
```cpp
DBUG_EXECUTE_IF(
^
```
**be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
```cpp
if (UNLIKELY(config::enable_debug_points)) {
\
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:277:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
DBUG_EXECUTE_IF(
^
```
**be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
```cpp
if (dp) {
\
^
```
</details>
##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -99,29 +116,83 @@
return mem_size;
}
-Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState*
state) {
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+ if (!_shared_state->need_to_spill) {
+ if (_shared_state->inner_shared_state) {
+ auto* inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
+ if (inner_sink_state_) {
+ auto* inner_sink_state =
+
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+ COUNTER_SET(_memory_used_counter,
inner_sink_state->_memory_used_counter->value());
+ }
+ }
+ return;
+ }
+
+ int64_t mem_size = 0;
+ auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+ for (auto& block : partitioned_blocks) {
+ if (block) {
+ mem_size += block->allocated_bytes();
+ }
+ }
+ COUNTER_SET(_memory_used_counter, mem_size);
+}
+
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState*
state, bool eos) {
+ size_t size_to_reserve = 0;
+ auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ if (_shared_state->need_to_spill) {
+ size_to_reserve = p._partition_count *
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+ } else {
+ if (_shared_state->inner_runtime_state) {
+ size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+ _shared_state->inner_runtime_state.get(), eos);
+ }
+ }
+
+ COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
+ return size_to_reserve;
+}
+
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
Review Comment:
warning: function '_revoke_unpartitioned_block' exceeds recommended
size/complexity thresholds [readability-function-size]
```cpp
Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:157:** 126
lines including whitespace and comments (threshold 80)
```cpp
Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
^
```
</details>
##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -479,12 +521,9 @@
Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block,
Review Comment:
warning: function 'sink' has cognitive complexity of 75 (threshold 50)
[readability-function-cognitive-complexity]
```cpp
Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block,
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:532:** +1,
including nesting penalty of 0, nesting level increased to 1
```cpp
if (rows == 0) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:533:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
if (eos) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:538:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
if (need_to_spill) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:540:** +1,
nesting level increased to 3
```cpp
} else {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:541:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
if
(UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:542:** +5,
including nesting penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(_setup_internal_operator(state));
^
```
**be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:542:** +6,
including nesting penalty of 5, nesting level increased to 6
```cpp
RETURN_IF_ERROR(_setup_internal_operator(state));
^
```
**be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:544:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", {
^
```
**be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
```cpp
if (UNLIKELY(config::enable_debug_points)) {
\
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:544:** +5,
including nesting penalty of 4, nesting level increased to 5
```cpp
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", {
^
```
**be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
```cpp
if (dp) {
\
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:550:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(_inner_sink_operator->sink(
^
```
**be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:550:** +5,
including nesting penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(_inner_sink_operator->sink(
^
```
**be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:561:**
nesting level increased to 3
```cpp
[&](auto& block) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:562:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
if (block) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:572:** +1,
including nesting penalty of 0, nesting level increased to 1
```cpp
if (need_to_spill) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:573:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0,
rows));
^
```
**be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:573:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0,
rows));
^
```
**be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:574:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
if (eos) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:576:** +1,
nesting level increased to 2
```cpp
} else if (revocable_mem_size(state) >
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:579:** +1,
nesting level increased to 1
```cpp
} else {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:580:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:581:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_setup_internal_operator(state));
^
```
**be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:581:** +4,
including nesting penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(_setup_internal_operator(state));
^
```
**be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:583:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", {
^
```
**be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
```cpp
if (UNLIKELY(config::enable_debug_points)) {
\
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:583:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", {
^
```
**be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
```cpp
if (dp) {
\
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:588:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_inner_sink_operator->sink(
^
```
**be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:588:** +3,
including nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_inner_sink_operator->sink(
^
```
**be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:591:** +2,
including nesting penalty of 1, nesting level increased to 2
```cpp
if (eos) {
^
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]