Copilot commented on code in PR #61973:
URL: https://github.com/apache/doris/pull/61973#discussion_r3018903682


##########
be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp:
##########
@@ -1027,6 +1027,49 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
revocable_mem_size) {
     ASSERT_EQ(probe_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
 }
 
+// Partition level >= _repartition_max_depth → revocable_mem_size returns 0.
+TEST_F(PartitionedHashJoinProbeOperatorTest, revocable_mem_size_at_max_depth) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    local_state->_shared_state->_is_spilled = true;
+    local_state->_child_eos = true;
+
+    // Set up a valid current partition with build not finished (the path that
+    // checks level vs max depth).
+    SpillFileSPtr build_file;
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        
->create_spill_file("ut/revocable_join_max_depth_build", build_file)
+                        .ok());
+    SpillFileSPtr probe_file;
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        
->create_spill_file("ut/revocable_join_max_depth_probe", probe_file)
+                        .ok());
+    local_state->_current_partition = JoinSpillPartitionInfo(
+            build_file, probe_file, 
static_cast<int>(probe_operator->_repartition_max_depth));
+    local_state->_current_partition.build_finished = false;
+
+    // Add a recovered build block large enough to exceed the threshold.
+    std::vector<int32_t> large_data(256 * 1024); // 1MB of int32
+    std::iota(large_data.begin(), large_data.end(), 0);
+    Block large_block = ColumnHelper::create_block<DataTypeInt32>(large_data);
+    ASSERT_GE(large_block.allocated_bytes(), 
SpillFile::MIN_SPILL_WRITE_BATCH_MEM);
+    local_state->_recovered_build_block = 
MutableBlock::create_unique(std::move(large_block));
+
+    // At max depth → not revocable.
+    ASSERT_EQ(probe_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    // One level below max depth → revocable.
+    local_state->_current_partition.level =
+            static_cast<int>(probe_operator->_repartition_max_depth) - 1;
+    ASSERT_GT(probe_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);

Review Comment:
   This test sets the partition `level` to exactly `_repartition_max_depth` and 
expects revocable=0, but that level appears unreachable in production because 
repartitioning rejects `new_level >= _repartition_max_depth` (so the maximum 
attainable level is `_repartition_max_depth - 1`). To validate real behavior 
(and catch the off-by-one), adjust the assertions to target `level == 
_repartition_max_depth - 1` as non-revocable and `level == 
_repartition_max_depth - 2` as still revocable (or explicitly set the runtime 
max depth to a small value and test boundary conditions).



##########
be/src/exec/operator/partitioned_hash_join_probe_operator.cpp:
##########
@@ -875,6 +875,11 @@ size_t 
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
         // Or if current partition has finished build hash table.
         return 0;
     }
+    // If the current partition has reached the max repartition depth, it 
cannot be
+    // repartitioned further, so its data is not revocable.
+    if (local_state._current_partition.level >= _repartition_max_depth) {

Review Comment:
   The depth guard is likely off-by-one relative to the actual repartition 
limit. The repartition path rejects `new_level = level + 1` when `new_level >= 
_repartition_max_depth` (see 
`PartitionedHashJoinProbeLocalState::repartition_current_partition`), which 
means a partition at `level == _repartition_max_depth - 1` already cannot be 
repartitioned and should be treated as non-revocable. Consider aligning this 
check with the repartition predicate (i.e., return 0 when `level + 1 >= 
_repartition_max_depth` / `level >= _repartition_max_depth - 1`) to avoid 
`revoke_memory()` attempting a repartition that will immediately error.
   ```suggestion
       // If repartitioning the current partition would exceed the max 
repartition depth
       // (i.e., level + 1 >= _repartition_max_depth), it cannot be 
repartitioned further,
       // so its data is not revocable.
       if (local_state._current_partition.level + 1 >= _repartition_max_depth) {
   ```



##########
be/src/exec/operator/partitioned_aggregation_source_operator.cpp:
##########
@@ -191,6 +191,11 @@ size_t 
PartitionedAggSourceOperatorX::revocable_mem_size(RuntimeState* state) co
     if (!local_state._shared_state->_is_spilled || 
!local_state._current_partition.spill_file) {
         return 0;
     }
+    // If the current partition has reached the max repartition depth, it 
cannot be
+    // repartitioned further, so its data is not revocable.
+    if (local_state._current_partition.level >= _repartition_max_depth) {

Review Comment:
   Same off-by-one issue as Hash Join: `_flush_and_repartition` errors when 
`new_level = level + 1 >= _repartition_max_depth`, so a partition at `level == 
_repartition_max_depth - 1` cannot be repartitioned and its memory is not 
revocable. The current `level >= _repartition_max_depth` guard won’t prevent 
futile revoke attempts at the last allowed level. Update the condition to match 
the repartition limit predicate.
   ```suggestion
       // If repartitioning this partition would exceed the max repartition 
depth, it cannot be
       // repartitioned further, so its data is not revocable.
       if (local_state._current_partition.level + 1 >= _repartition_max_depth) {
   ```



##########
be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp:
##########
@@ -737,6 +737,42 @@ TEST_F(PartitionedAggregationSourceOperatorTest, 
RevocableMemSizeWithAggContaine
     
EXPECT_EQ(source_operator->revocable_mem_size(_helper.runtime_state.get()), 
container_bytes);
 }
 
+// Condition: partition level >= _repartition_max_depth → 0 (not revocable).
+TEST_F(PartitionedAggregationSourceOperatorTest, 
RevocableMemSizeAtMaxDepthReturnsZero) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    const auto tnode = _helper.create_test_plan_node();
+    ASSERT_TRUE(source_operator->init(tnode, 
_helper.runtime_state.get()).ok());
+    ASSERT_TRUE(source_operator->prepare(_helper.runtime_state.get()).ok());
+
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_source_local_state(_helper.runtime_state.get(),
+                                                          
source_operator.get(), shared_state);
+
+    // Add a large block so that without the depth check, revocable_mem_size 
would be > 0.
+    std::vector<int32_t> large_data(300000);
+    std::iota(large_data.begin(), large_data.end(), 0);
+    auto large_block = ColumnHelper::create_block<DataTypeInt32>(large_data);
+    ASSERT_GT(large_block.allocated_bytes(), 1UL << 20);
+    local_state->_blocks.push_back(std::move(large_block));
+
+    SpillFileSPtr spill_file;
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file("ut/revocable_max_depth", 
spill_file)
+                        .ok());
+    local_state->_current_partition.spill_file = spill_file;
+
+    // Set partition level to max depth → not revocable.
+    local_state->_current_partition.level =
+            static_cast<int>(source_operator->_repartition_max_depth);
+    
EXPECT_EQ(source_operator->revocable_mem_size(_helper.runtime_state.get()), 
0UL);
+
+    // Also verify level == max_depth - 1 IS still revocable.
+    local_state->_current_partition.level =
+            static_cast<int>(source_operator->_repartition_max_depth) - 1;
+    
EXPECT_GT(source_operator->revocable_mem_size(_helper.runtime_state.get()), 
0UL);
+}

Review Comment:
   The boundary levels asserted here don’t match the operator’s repartition 
limit: `_flush_and_repartition` fails when `new_level = level + 1 >= 
_repartition_max_depth`, so `level == _repartition_max_depth - 1` is already at 
the maximum depth and should be treated as non-revocable. Setting `level == 
_repartition_max_depth` is likely an unreachable state, so the test should 
instead cover `max_depth - 1` (expect 0) vs `max_depth - 2` (expect >0), or 
configure the runtime max depth to exercise the edge case deterministically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to