This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 96b9388218b [opt](explode) Optimize explode_outer and posexplode 
(#62069) (#62145)
96b9388218b is described below

commit 96b9388218be239ad1698ccfdce762fb50e9ac82
Author: TengJianPing <[email protected]>
AuthorDate: Tue Apr 7 18:53:50 2026 +0800

    [opt](explode) Optimize explode_outer and posexplode (#62069) (#62145)
    
    Issue Number: Pick #62069
    
    Related PR: #xxx
    
    Problem Summary:
    
    Improvement of #60352, optimize explode_outer, posexplode and
    posexplode_outer.
    
    The approach is a single-pass algorithm: walk through child rows,
    accumulating contiguous segments into the output, then when hitting a
    null/empty row or reaching the end, flush the segment using bulk
    operations. For outer-null rows, insert a NULL and copy the
    non-table-function columns directly. This naturally handles both outer
    and
    non-outer modes since non-outer mode just won't produce any null
    outputs. For posexplode, generate position indices alongside this.
    
    Performance test result.
    Create and populate a large test table. Use 100,000,000 base rows, each
    with an array of 10 INT elements, producing 1,000,000,000 total exploded
    output rows.
    
    | Function | slow path | optimized | SpeedUp|
    | --------- |---------- |-----------|----------|
    | explode | 9734 ms | 5105 ms |90%|
    | explode_outer | 9685 ms | 5064 ms |91%|
    | posexplode | 11021 ms | 5963 ms |84%|
    | posexplode_outer | 14088 ms | 5903 ms |138%|
    
    None
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
    - [ ] Previous test can cover this change. - [ ] No code files have been
    changed. - [ ] Other reason <!-- Add your reason? -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/exec/operator/operator.h                    |   1 -
 be/src/exec/operator/table_function_operator.cpp   | 210 +++++--
 be/src/exprs/table_function/table_function.h       |   1 +
 be/src/exprs/table_function/vexplode.cpp           |   2 +-
 be/src/exprs/table_function/vexplode_v2.cpp        |   3 +-
 .../exec/operator/analytic_sink_operator_test.cpp  |   2 +-
 ...istinct_streaming_aggregation_operator_test.cpp |   2 +-
 .../operator/exchange_source_operator_test.cpp     |   2 +-
 .../operator/partition_sort_sink_operator_test.cpp |   2 +-
 .../exec/operator/query_cache_operator_test.cpp    |   2 +-
 be/test/exec/operator/repeat_operator_test.cpp     |   2 +-
 be/test/exec/operator/set_operator_test.cpp        |   4 +-
 be/test/exec/operator/sort_operator_test.cpp       |   2 +-
 .../exec/operator/table_function_operator_test.cpp | 639 ++++++++++++++++++++-
 be/test/exec/operator/union_operator_test.cpp      |   6 +-
 .../exec/sink/arrow_result_block_buffer_test.cpp   |   8 +-
 be/test/exec/sink/result_block_buffer_test.cpp     |   6 +-
 be/test/testutil/mock/mock_runtime_state.h         |   4 +-
 18 files changed, 820 insertions(+), 78 deletions(-)

diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h
index 8c880b24331..2f403d275fd 100644
--- a/be/src/exec/operator/operator.h
+++ b/be/src/exec/operator/operator.h
@@ -805,7 +805,6 @@ public:
               _resource_profile(tnode.resource_profile),
               _limit(tnode.limit) {
         if (tnode.__isset.output_tuple_id) {
-            _output_row_descriptor.reset(new RowDescriptor(descs, 
{tnode.output_tuple_id}, {true}));
             _output_row_descriptor = std::make_unique<RowDescriptor>(
                     descs, std::vector {tnode.output_tuple_id}, std::vector 
{true});
         }
diff --git a/be/src/exec/operator/table_function_operator.cpp 
b/be/src/exec/operator/table_function_operator.cpp
index 18a63571043..7be0c1d35ef 100644
--- a/be/src/exec/operator/table_function_operator.cpp
+++ b/be/src/exec/operator/table_function_operator.cpp
@@ -28,6 +28,8 @@
 #include "core/block/block.h"
 #include "core/block/column_numbers.h"
 #include "core/column/column_nullable.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
 #include "core/custom_allocator.h"
 #include "exec/operator/operator.h"
 #include "exprs/table_function/table_function_factory.h"
@@ -264,30 +266,156 @@ Status 
TableFunctionLocalState::_get_expanded_block_block_fast_path(
     const auto& offsets = *_block_fast_path_ctx.offsets_ptr;
     const auto child_rows = cast_set<int64_t>(offsets.size());
 
-    std::vector<uint32_t> row_ids;
-    row_ids.reserve(remaining_capacity);
-    uint64_t first_nested_idx = 0;
-    uint64_t expected_next_nested_idx = 0;
-    bool found_nested_range = false;
-
     int64_t child_row = _block_fast_path_row;
     uint64_t in_row_offset = _block_fast_path_in_row_offset;
     int produced_rows = 0;
 
-    while (produced_rows < remaining_capacity && child_row < child_rows) {
-        if (_block_fast_path_ctx.array_nullmap_data &&
-            _block_fast_path_ctx.array_nullmap_data[child_row]) {
-            // NULL array row: skip it here. Slow path will handle output 
semantics if needed.
-            child_row++;
-            in_row_offset = 0;
-            continue;
+    const bool is_outer = _fns[0]->is_outer();
+    const bool is_posexplode = _block_fast_path_ctx.generate_row_index;
+    auto& out_col = columns[p._child_slots.size()];
+
+    // Decompose posexplode struct output column if needed
+    ColumnStruct* struct_col_ptr = nullptr;
+    ColumnUInt8* outer_struct_nullmap_ptr = nullptr;
+    IColumn* value_col_ptr = nullptr;
+    ColumnInt32* pos_col_ptr = nullptr;
+    if (is_posexplode) {
+        if (out_col->is_nullable()) {
+            auto* nullable = assert_cast<ColumnNullable*>(out_col.get());
+            struct_col_ptr = 
assert_cast<ColumnStruct*>(nullable->get_nested_column_ptr().get());
+            outer_struct_nullmap_ptr =
+                    
assert_cast<ColumnUInt8*>(nullable->get_null_map_column_ptr().get());
+        } else {
+            struct_col_ptr = assert_cast<ColumnStruct*>(out_col.get());
+        }
+        pos_col_ptr = 
assert_cast<ColumnInt32*>(&struct_col_ptr->get_column(0));
+        value_col_ptr = &struct_col_ptr->get_column(1);
+    }
+    // Segment tracking: accumulate contiguous nested ranges, flush on 
boundaries.
+    // Array column offsets are monotonically non-decreasing, so nested data 
across child rows
+    // is always contiguous (even with NULL/empty rows that contribute zero 
elements).
+    struct ExpandSegmentContext {
+        std::vector<uint32_t>
+                seg_row_ids; // row ids of non table-function columns to 
replicate for this segment
+        std::vector<int32_t>
+                seg_positions; // for posexplode, the position values to write 
for this segment
+        int64_t seg_nested_start = -1; // start offset in the nested column of 
this segment
+        int seg_nested_count =
+                0; // number of nested rows in this segment (can be > child 
row count due to multiple elements per row)
+    };
+    ExpandSegmentContext segment_ctx;
+    segment_ctx.seg_row_ids.reserve(remaining_capacity);
+    if (is_posexplode) {
+        segment_ctx.seg_positions.reserve(remaining_capacity);
+    }
+
+    auto reset_expand_segment_ctx = [&segment_ctx, is_posexplode]() {
+        segment_ctx.seg_nested_start = -1;
+        segment_ctx.seg_nested_count = 0;
+        segment_ctx.seg_row_ids.clear();
+        if (is_posexplode) {
+            segment_ctx.seg_positions.clear();
+        }
+    };
+
+    // Flush accumulated contiguous segment to output columns
+    auto flush_segment = [&]() {
+        if (segment_ctx.seg_nested_count == 0) {
+            return;
+        }
+
+        // Non-TF columns: replicate each child row for every output element
+        for (auto index : p._output_slot_indexs) {
+            auto src_column = _child_block->get_by_position(index).column;
+            columns[index]->insert_indices_from(
+                    *src_column, segment_ctx.seg_row_ids.data(),
+                    segment_ctx.seg_row_ids.data() + 
segment_ctx.seg_row_ids.size());
+        }
+
+        if (is_posexplode) {
+            // Write positions
+            pos_col_ptr->insert_many_raw_data(
+                    reinterpret_cast<const 
char*>(segment_ctx.seg_positions.data()),
+                    segment_ctx.seg_positions.size());
+            // Write nested values to the struct's value sub-column
+            DCHECK(value_col_ptr->is_nullable())
+                    << "posexplode fast path requires nullable value column";
+            auto* val_nullable = assert_cast<ColumnNullable*>(value_col_ptr);
+            val_nullable->get_nested_column_ptr()->insert_range_from(
+                    *_block_fast_path_ctx.nested_col, 
segment_ctx.seg_nested_start,
+                    segment_ctx.seg_nested_count);
+            auto* val_nullmap =
+                    
assert_cast<ColumnUInt8*>(val_nullable->get_null_map_column_ptr().get());
+            auto& val_nullmap_data = val_nullmap->get_data();
+            const size_t old_size = val_nullmap_data.size();
+            val_nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
+            if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
+                memcpy(val_nullmap_data.data() + old_size,
+                       _block_fast_path_ctx.nested_nullmap_data + 
segment_ctx.seg_nested_start,
+                       segment_ctx.seg_nested_count * sizeof(UInt8));
+            } else {
+                memset(val_nullmap_data.data() + old_size, 0,
+                       segment_ctx.seg_nested_count * sizeof(UInt8));
+            }
+            // Struct-level null map: these rows are not null
+            if (outer_struct_nullmap_ptr) {
+                
outer_struct_nullmap_ptr->insert_many_defaults(segment_ctx.seg_nested_count);
+            }
+        } else if (out_col->is_nullable()) {
+            auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get());
+            out_nullable->get_nested_column_ptr()->insert_range_from(
+                    *_block_fast_path_ctx.nested_col, 
segment_ctx.seg_nested_start,
+                    segment_ctx.seg_nested_count);
+            auto* nullmap_column =
+                    
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
+            auto& nullmap_data = nullmap_column->get_data();
+            const size_t old_size = nullmap_data.size();
+            nullmap_data.resize(old_size + segment_ctx.seg_nested_count);
+            if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
+                memcpy(nullmap_data.data() + old_size,
+                       _block_fast_path_ctx.nested_nullmap_data + 
segment_ctx.seg_nested_start,
+                       segment_ctx.seg_nested_count * sizeof(UInt8));
+            } else {
+                memset(nullmap_data.data() + old_size, 0,
+                       segment_ctx.seg_nested_count * sizeof(UInt8));
+            }
+        } else {
+            out_col->insert_range_from(*_block_fast_path_ctx.nested_col,
+                                       segment_ctx.seg_nested_start, 
segment_ctx.seg_nested_count);
         }
+        reset_expand_segment_ctx();
+    };
+
+    // Emit one NULL output row for an outer-null/empty child row
+    auto emit_outer_null = [&](int64_t cr) {
+        for (auto index : p._output_slot_indexs) {
+            auto src_column = _child_block->get_by_position(index).column;
+            columns[index]->insert_from(*src_column, cr);
+        }
+        out_col->insert_default();
+    };
+    // Walk through child rows, accumulating contiguous segments into the 
output,
+    // then when hitting a null/empty row or reaching the end,
+    // flush the segment using bulk operations.
+    // For outer-null rows, insert a NULL and copy the non-table-function 
columns directly.
+    // This naturally handles both outer and non-outer modes since non-outer 
mode
+    // just won't produce any null outputs.
+    // For posexplode, generate position indices alongside this.
+    while (produced_rows < remaining_capacity && child_row < child_rows) {
+        const bool is_null_row = _block_fast_path_ctx.array_nullmap_data &&
+                                 
_block_fast_path_ctx.array_nullmap_data[child_row];
 
         const uint64_t prev_off = child_row == 0 ? 0 : offsets[child_row - 1];
-        const uint64_t cur_off = offsets[child_row];
+        const uint64_t cur_off = is_null_row ? prev_off : offsets[child_row];
         const uint64_t nested_len = cur_off - prev_off;
 
-        if (in_row_offset >= nested_len) {
+        if (is_null_row || in_row_offset >= nested_len) {
+            // for outer functions, emit null row for NULL or empty array rows
+            if (is_outer && in_row_offset == 0 && (is_null_row || nested_len 
== 0)) {
+                flush_segment();
+                emit_outer_null(child_row);
+                produced_rows++;
+            }
             child_row++;
             in_row_offset = 0;
             continue;
@@ -301,21 +429,28 @@ Status 
TableFunctionLocalState::_get_expanded_block_block_fast_path(
         DCHECK_LE(nested_start + take_count, cur_off);
         DCHECK_LE(nested_start + take_count, 
_block_fast_path_ctx.nested_col->size());
 
-        if (!found_nested_range) {
-            found_nested_range = true;
-            first_nested_idx = nested_start;
-            expected_next_nested_idx = nested_start;
+        if (segment_ctx.seg_nested_count == 0) {
+            segment_ctx.seg_nested_start = nested_start;
+        } else {
+            // Nested data from an array column is always contiguous: offsets 
are monotonically
+            // non-decreasing, so skipping NULL/empty rows doesn't create gaps.
+            DCHECK_EQ(static_cast<uint64_t>(segment_ctx.seg_nested_start +
+                                            segment_ctx.seg_nested_count),
+                      nested_start)
+                    << "nested data must be contiguous across child rows";
         }
-        DCHECK_EQ(nested_start, expected_next_nested_idx);
 
         // Map each produced output row back to its source child row for 
copying non-table-function
         // columns via insert_indices_from().
         for (int j = 0; j < take_count; ++j) {
-            row_ids.push_back(cast_set<uint32_t>(child_row));
+            segment_ctx.seg_row_ids.push_back(cast_set<uint32_t>(child_row));
+            if (is_posexplode) {
+                
segment_ctx.seg_positions.push_back(cast_set<int32_t>(in_row_offset + j));
+            }
         }
 
+        segment_ctx.seg_nested_count += take_count;
         produced_rows += take_count;
-        expected_next_nested_idx += take_count;
         in_row_offset += take_count;
         if (in_row_offset >= nested_len) {
             child_row++;
@@ -323,35 +458,8 @@ Status 
TableFunctionLocalState::_get_expanded_block_block_fast_path(
         }
     }
 
-    if (produced_rows > 0) {
-        for (auto index : p._output_slot_indexs) {
-            auto src_column = _child_block->get_by_position(index).column;
-            columns[index]->insert_indices_from(*src_column, row_ids.data(),
-                                                row_ids.data() + 
produced_rows);
-        }
-
-        auto& out_col = columns[p._child_slots.size()];
-        if (out_col->is_nullable()) {
-            auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get());
-            out_nullable->get_nested_column_ptr()->insert_range_from(
-                    *_block_fast_path_ctx.nested_col, first_nested_idx, 
produced_rows);
-            auto* nullmap_column =
-                    
assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get());
-            auto& nullmap_data = nullmap_column->get_data();
-            const size_t old_size = nullmap_data.size();
-            nullmap_data.resize(old_size + produced_rows);
-            if (_block_fast_path_ctx.nested_nullmap_data != nullptr) {
-                memcpy(nullmap_data.data() + old_size,
-                       _block_fast_path_ctx.nested_nullmap_data + 
first_nested_idx,
-                       produced_rows * sizeof(UInt8));
-            } else {
-                memset(nullmap_data.data() + old_size, 0, produced_rows * 
sizeof(UInt8));
-            }
-        } else {
-            out_col->insert_range_from(*_block_fast_path_ctx.nested_col, 
first_nested_idx,
-                                       produced_rows);
-        }
-    }
+    // Flush any remaining segment
+    flush_segment();
 
     _block_fast_path_row = child_row;
     _block_fast_path_in_row_offset = in_row_offset;
diff --git a/be/src/exprs/table_function/table_function.h 
b/be/src/exprs/table_function/table_function.h
index be800080148..3c4bb7f1c7d 100644
--- a/be/src/exprs/table_function/table_function.h
+++ b/be/src/exprs/table_function/table_function.h
@@ -40,6 +40,7 @@ public:
         const IColumn::Offsets64* offsets_ptr = nullptr;
         ColumnPtr nested_col = nullptr;
         const UInt8* nested_nullmap_data = nullptr;
+        bool generate_row_index = false;
     };
 
     virtual Status prepare() { return Status::OK(); }
diff --git a/be/src/exprs/table_function/vexplode.cpp 
b/be/src/exprs/table_function/vexplode.cpp
index 07fa4d4355e..5fcad11f94c 100644
--- a/be/src/exprs/table_function/vexplode.cpp
+++ b/be/src/exprs/table_function/vexplode.cpp
@@ -94,7 +94,7 @@ Status VExplodeTableFunction::process_init(Block* block, 
RuntimeState* state) {
 }
 
 bool VExplodeTableFunction::support_block_fast_path() const {
-    return !_is_outer;
+    return true;
 }
 
 Status VExplodeTableFunction::prepare_block_fast_path(Block* /*block*/, 
RuntimeState* /*state*/,
diff --git a/be/src/exprs/table_function/vexplode_v2.cpp 
b/be/src/exprs/table_function/vexplode_v2.cpp
index 9aa42a84444..55be4fdf29a 100644
--- a/be/src/exprs/table_function/vexplode_v2.cpp
+++ b/be/src/exprs/table_function/vexplode_v2.cpp
@@ -109,7 +109,7 @@ Status VExplodeV2TableFunction::process_init(Block* block, 
RuntimeState* state)
 }
 
 bool VExplodeV2TableFunction::support_block_fast_path() const {
-    return !_is_outer && !_generate_row_index && _multi_detail.size() == 1;
+    return _multi_detail.size() == 1;
 }
 
 Status VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/, 
RuntimeState* /*state*/,
@@ -123,6 +123,7 @@ Status 
VExplodeV2TableFunction::prepare_block_fast_path(Block* /*block*/, Runtim
     ctx->offsets_ptr = detail.offsets_ptr;
     ctx->nested_col = detail.nested_col;
     ctx->nested_nullmap_data = detail.nested_nullmap_data;
+    ctx->generate_row_index = _generate_row_index;
     return Status::OK();
 }
 
diff --git a/be/test/exec/operator/analytic_sink_operator_test.cpp 
b/be/test/exec/operator/analytic_sink_operator_test.cpp
index 6729d6759da..a64d16c676f 100644
--- a/be/test/exec/operator/analytic_sink_operator_test.cpp
+++ b/be/test/exec/operator/analytic_sink_operator_test.cpp
@@ -56,7 +56,7 @@ struct AnalyticSinkOperatorTest : public ::testing::Test {
         sink = std::make_unique<AnalyticSinkOperatorX>(&pool);
         source = std::make_unique<AnalyticSourceOperatorX>();
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = batch_size;
+        state->_batch_size = batch_size;
         std::cout << "AnalyticSinkOperatorTest::SetUp() batch_size: " << 
batch_size << std::endl;
         _child_op = std::make_unique<MockAnalyticSinkOperator>();
         for (int i = 0; i < batch_size; i++) {
diff --git 
a/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp 
b/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp
index b123cb60e34..88434e47fd7 100644
--- a/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp
+++ b/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp
@@ -34,7 +34,7 @@ struct DistinctStreamingAggOperatorTest : public 
::testing::Test {
         op = std::make_unique<DistinctStreamingAggOperatorX>();
         mock_op = std::make_shared<MockOperatorX>();
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = 10;
+        state->_batch_size = 10;
         op->_child = mock_op;
     }
 
diff --git a/be/test/exec/operator/exchange_source_operator_test.cpp 
b/be/test/exec/operator/exchange_source_operator_test.cpp
index 543f5eafaa2..1f7abc4e00f 100644
--- a/be/test/exec/operator/exchange_source_operator_test.cpp
+++ b/be/test/exec/operator/exchange_source_operator_test.cpp
@@ -68,7 +68,7 @@ struct MockExchangeSourceLocalState : public 
ExchangeLocalState {
 struct ExchangeSourceOperatorXTest : public ::testing::Test {
     void SetUp() override {
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = 10;
+        state->_batch_size = 10;
     }
 
     void create_op(int num_senders, bool is_merging, int offset, int limit) {
diff --git a/be/test/exec/operator/partition_sort_sink_operator_test.cpp 
b/be/test/exec/operator/partition_sort_sink_operator_test.cpp
index 5dc667bbae2..fa42cc7576b 100644
--- a/be/test/exec/operator/partition_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/partition_sort_sink_operator_test.cpp
@@ -51,7 +51,7 @@ private:
 struct PartitionSortOperatorTest : public ::testing::Test {
     void SetUp() override {
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = 10;
+        state->_batch_size = 10;
         _child_op = std::make_unique<PartitionSortOperatorMockOperator>();
     }
 
diff --git a/be/test/exec/operator/query_cache_operator_test.cpp 
b/be/test/exec/operator/query_cache_operator_test.cpp
index 7f941281fe6..a99e9bcb9d9 100644
--- a/be/test/exec/operator/query_cache_operator_test.cpp
+++ b/be/test/exec/operator/query_cache_operator_test.cpp
@@ -50,7 +50,7 @@ private:
 struct QueryCacheOperatorTest : public ::testing::Test {
     void SetUp() override {
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = 10;
+        state->_batch_size = 10;
         child_op = std::make_unique<QueryCacheMockChildOperator>();
         query_cache_uptr.reset(QueryCache::create_global_cache(1024 * 1024 * 
1024));
         query_cache = query_cache_uptr.get();
diff --git a/be/test/exec/operator/repeat_operator_test.cpp 
b/be/test/exec/operator/repeat_operator_test.cpp
index 781f14cc246..80363b8adf0 100644
--- a/be/test/exec/operator/repeat_operator_test.cpp
+++ b/be/test/exec/operator/repeat_operator_test.cpp
@@ -34,7 +34,7 @@ struct RepeatOperatorTest : public ::testing::Test {
         op = std::make_unique<RepeatOperatorX>();
         mock_op = std::make_shared<MockOperatorX>();
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = 10;
+        state->_batch_size = 10;
         op->_child = mock_op;
     }
 
diff --git a/be/test/exec/operator/set_operator_test.cpp 
b/be/test/exec/operator/set_operator_test.cpp
index d85e7a04c06..689f913e0e0 100644
--- a/be/test/exec/operator/set_operator_test.cpp
+++ b/be/test/exec/operator/set_operator_test.cpp
@@ -42,7 +42,7 @@ template <bool is_intersect>
 struct SetOperatorTest : public ::testing::Test {
     void SetUp() override {
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = 5;
+        state->_batch_size = 5;
     }
 
     void init_op(int child_size, DataTypes output_type) {
@@ -352,7 +352,7 @@ TEST_F(ExceptOperatorTest, test_build_not_ignore_null) {
 TEST_F(ExceptOperatorTest, test_output_null_batsh_size) {
     init_op(2, 
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())});
 
-    state->batsh_size = 3; // set batch size to 3
+    state->_batch_size = 3; // set batch size to 3
     sink_op->_child_exprs = MockSlotRef::create_mock_contexts(
             DataTypes 
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())});
     probe_sink_ops[0]->_child_exprs = MockSlotRef::create_mock_contexts(
diff --git a/be/test/exec/operator/sort_operator_test.cpp 
b/be/test/exec/operator/sort_operator_test.cpp
index 4bb866242ab..a954d2d4898 100644
--- a/be/test/exec/operator/sort_operator_test.cpp
+++ b/be/test/exec/operator/sort_operator_test.cpp
@@ -50,7 +50,7 @@ private:
 struct SortOperatorTest : public ::testing::Test {
     void SetUp() override {
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = 10;
+        state->_batch_size = 10;
         _child_op = std::make_unique<MockOperator>();
     }
 
diff --git a/be/test/exec/operator/table_function_operator_test.cpp 
b/be/test/exec/operator/table_function_operator_test.cpp
index b1ce616aaf7..bd7b108b18a 100644
--- a/be/test/exec/operator/table_function_operator_test.cpp
+++ b/be/test/exec/operator/table_function_operator_test.cpp
@@ -28,6 +28,7 @@
 #include "core/block/block.h"
 #include "core/column/column_array.h"
 #include "core/column/column_nullable.h"
+#include "core/column/column_struct.h"
 #include "core/data_type/data_type_array.h"
 #include "core/data_type/data_type_nullable.h"
 #include "exec/operator/operator_helper.h"
@@ -412,7 +413,7 @@ TEST_F(TableFunctionOperatorTest, block_fast_path_explode) {
 }
 
 TEST_F(TableFunctionOperatorTest, block_fast_path_explode_batch_truncate) {
-    state->batsh_size = 2;
+    state->_batch_size = 2;
     bool get_value_called = false;
     auto int_type = std::make_shared<DataTypeInt32>();
     auto arr_type = std::make_shared<DataTypeArray>(int_type);
@@ -655,7 +656,7 @@ TEST_F(TableFunctionOperatorTest, 
block_fast_path_explode_nullable_array_misalig
 
 TEST_F(TableFunctionOperatorTest,
        block_fast_path_explode_nullable_array_partial_gap_uses_slow_path) {
-    state->batsh_size = 2;
+    state->_batch_size = 2;
     bool get_value_called = false;
     auto int_type = std::make_shared<DataTypeInt32>();
     auto arr_type = 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeArray>(int_type));
@@ -1081,7 +1082,7 @@ struct UnnestTest : public ::testing::Test {
         obj_pool = std::make_unique<ObjectPool>();
         query_ctx = generate_one_query();
         runtime_profile = std::make_shared<RuntimeProfile>("test");
-        runtime_state->batsh_size = 5;
+        runtime_state->_batch_size = 5;
         runtime_state->_query_ctx = query_ctx.get();
         runtime_state->_query_id = query_ctx->query_id();
         runtime_state->resize_op_id_to_local_state(-100);
@@ -1237,6 +1238,98 @@ struct UnnestTest : public ::testing::Test {
         // end of tplan_node_table_function_node
     }
 
+    void setup_explode_plan_node(TPlanNode& tplan_node_table_function_node, 
bool outer) {
+        // Set main TPlanNode properties
+        tplan_node_table_function_node.node_id = 0;
+        tplan_node_table_function_node.node_type = 
TPlanNodeType::TABLE_FUNCTION_NODE;
+        tplan_node_table_function_node.num_children = 1;
+        tplan_node_table_function_node.limit = -1;
+
+        tplan_node_table_function_node.row_tuples.push_back(tuple0.id);
+        tplan_node_table_function_node.row_tuples.push_back(tuple1.id);
+        tplan_node_table_function_node.nullable_tuples.push_back(false);
+        tplan_node_table_function_node.nullable_tuples.push_back(false);
+        tplan_node_table_function_node.compact_data = false;
+        tplan_node_table_function_node.is_serial_operator = false;
+
+        // setup table function node
+        
tplan_node_table_function_node.__set_table_function_node(TTableFunctionNode());
+
+        // setup fnCallExprList of table function node
+        TExpr fn_expr;
+
+        fn_expr.nodes.emplace_back();
+        fn_expr.nodes[0].node_type = TExprNodeType::FUNCTION_CALL;
+        fn_expr.nodes[0].type.types.emplace_back(type_node_int);
+        fn_expr.nodes[0].num_children = 1;
+
+        // setup TFunction of table function node
+        fn_expr.nodes[0].__set_fn(TFunction());
+        fn_expr.nodes[0].fn.__set_name(TFunctionName());
+        fn_expr.nodes[0].fn.name.function_name = outer ? "explode_outer" : 
"explode";
+
+        fn_expr.nodes[0].fn.arg_types.emplace_back(type_desc_array_int);
+        fn_expr.nodes[0].fn.ret_type.types.emplace_back(type_node_int);
+
+        fn_expr.nodes[0].fn.has_var_args = false;
+        fn_expr.nodes[0].fn.signature = outer ? "explode_outer(array<int>)" : 
"explode(array<int>)";
+        fn_expr.nodes[0].fn.__set_scalar_fn(TScalarFunction());
+        fn_expr.nodes[0].fn.scalar_fn.symbol = "";
+        fn_expr.nodes[0].fn.id = 0;
+        fn_expr.nodes[0].fn.vectorized = true;
+        fn_expr.nodes[0].fn.is_udtf_function = false;
+        fn_expr.nodes[0].fn.is_static_load = false;
+        fn_expr.nodes[0].fn.expiration_time = 360;
+        fn_expr.nodes[0].is_nullable = true;
+
+        // explode input slot ref: array<int>
+        fn_expr.nodes.emplace_back();
+        fn_expr.nodes[1].node_type = TExprNodeType::SLOT_REF;
+        fn_expr.nodes[1].type = type_desc_array_int;
+        fn_expr.nodes[1].num_children = 0;
+        fn_expr.nodes[1].__set_slot_ref(TSlotRef());
+        fn_expr.nodes[1].slot_ref = slot_ref_tags;
+        fn_expr.nodes[1].output_scale = -1;
+        fn_expr.nodes[1].is_nullable = true;
+        fn_expr.nodes[1].label = "tags";
+
+        
tplan_node_table_function_node.table_function_node.fnCallExprList.push_back(fn_expr);
+
+        // Set output slot IDs
+        
tplan_node_table_function_node.table_function_node.outputSlotIds.push_back(slot_desc_id.id);
+        
tplan_node_table_function_node.table_function_node.outputSlotIds.push_back(
+                slot_desc_unnest_tag.id);
+
+        // Set projections
+        TExpr texpr_proj0;
+        texpr_proj0.nodes.emplace_back();
+        texpr_proj0.nodes[0].node_type = TExprNodeType::SLOT_REF;
+        texpr_proj0.nodes[0].type = type_desc_int;
+        texpr_proj0.nodes[0].num_children = 0;
+        texpr_proj0.nodes[0].__set_slot_ref(TSlotRef());
+        texpr_proj0.nodes[0].slot_ref = slot_ref_id;
+        texpr_proj0.nodes[0].output_scale = -1;
+        texpr_proj0.nodes[0].is_nullable = true;
+        texpr_proj0.nodes[0].label = "id";
+
+        TExpr texpr_proj1;
+        texpr_proj1.nodes.emplace_back();
+        texpr_proj1.nodes[0].node_type = TExprNodeType::SLOT_REF;
+        texpr_proj1.nodes[0].type = type_desc_int;
+        texpr_proj1.nodes[0].num_children = 0;
+        texpr_proj1.nodes[0].__set_slot_ref(TSlotRef());
+        texpr_proj1.nodes[0].slot_ref = slot_ref_unnest_tag;
+        texpr_proj1.nodes[0].output_scale = -1;
+        texpr_proj1.nodes[0].is_nullable = true;
+        texpr_proj1.nodes[0].label = "tag";
+
+        tplan_node_table_function_node.projections.push_back(texpr_proj0);
+        tplan_node_table_function_node.projections.push_back(texpr_proj1);
+        tplan_node_table_function_node.output_tuple_id = tuple2.id;
+        tplan_node_table_function_node.nereids_id = 144;
+        // end of tplan_node_table_function_node
+    }
+
     std::shared_ptr<TableFunctionOperatorX> create_test_operators(
             DescriptorTbl* desc_tbl, const TPlanNode& 
tplan_node_table_function_node) {
         runtime_state->set_desc_tbl(desc_tbl);
@@ -1529,4 +1622,544 @@ TEST_F(UnnestTest, outer) {
         EXPECT_TRUE(ColumnHelper::block_equal(output_block, 
expected_output_block));
     }
 }
+
+// Test inner mode fast path with NULL and empty arrays (they should be 
skipped)
+TEST_F(UnnestTest, inner_with_nulls_fast_path) {
+    TDescriptorTable desc_table;
+    desc_table.tupleDescriptors.push_back(tuple0);
+    desc_table.tupleDescriptors.push_back(tuple1);
+    desc_table.tupleDescriptors.push_back(tuple2);
+    desc_table.slotDescriptors.push_back(slot_desc_id);
+    desc_table.slotDescriptors.push_back(slot_desc_tags);
+    desc_table.slotDescriptors.push_back(slot_desc_unnest_tag);
+    desc_table.slotDescriptors.push_back(slot_desc_id_2);
+    desc_table.slotDescriptors.push_back(slot_desc_unnest_tag2);
+
+    setup_exec_env();
+    runtime_state->_batch_size = 4096;
+
+    TPlanNode tplan_node;
+    setup_explode_plan_node(tplan_node, false); // inner, no conjuncts → fast 
path
+
+    DescriptorTbl* desc_tbl;
+    auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+    EXPECT_TRUE(st.ok());
+
+    DataTypePtr data_type_int(std::make_shared<DataTypeInt32>());
+    auto data_type_int_nullable = make_nullable(data_type_int);
+    DataTypePtr 
data_type_array_type(std::make_shared<DataTypeArray>(data_type_int_nullable));
+    auto data_type_array_type_nullable = make_nullable(data_type_array_type);
+
+    // Build input: ids=[1,2,3,4,5], arrays=[[10,20,30], NULL, [40,50], [], 
[60]]
+    auto build_input = [&]() {
+        auto result_block = std::make_unique<Block>();
+
+        auto id_column = ColumnInt32::create();
+        for (int32_t id : {1, 2, 3, 4, 5}) {
+            id_column->insert_data((const char*)(&id), 0);
+        }
+        
result_block->insert(ColumnWithTypeAndName(make_nullable(std::move(id_column)),
+                                                   data_type_int_nullable, 
"id"));
+
+        auto arr_data = ColumnInt32::create();
+        auto arr_offsets = ColumnOffset64::create();
+        auto arr_nullmap = ColumnUInt8::create();
+
+        // Row 0: [10, 20, 30]
+        for (int32_t v : {10, 20, 30}) {
+            arr_data->insert_data((const char*)(&v), 0);
+        }
+        ColumnArray::Offset64 off = 3;
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        // Row 1: NULL
+        arr_offsets->insert_data((const char*)(&off), 0); // same offset
+        arr_nullmap->get_data().push_back(1);
+
+        // Row 2: [40, 50]
+        for (int32_t v : {40, 50}) {
+            arr_data->insert_data((const char*)(&v), 0);
+        }
+        off = 5;
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        // Row 3: []
+        arr_offsets->insert_data((const char*)(&off), 0); // same offset
+        arr_nullmap->get_data().push_back(0);
+
+        // Row 4: [60]
+        int32_t v60 = 60;
+        arr_data->insert_data((const char*)(&v60), 0);
+        off = 6;
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        auto array_column =
+                ColumnArray::create(make_nullable(std::move(arr_data)), 
std::move(arr_offsets));
+        auto nullable_array =
+                ColumnNullable::create(std::move(array_column), 
std::move(arr_nullmap));
+        result_block->insert(ColumnWithTypeAndName(std::move(nullable_array),
+                                                   
data_type_array_type_nullable, "tags"));
+        return result_block;
+    };
+
+    {
+        auto table_func_op = create_test_operators(desc_tbl, tplan_node);
+        auto* local_state = 
runtime_state->get_local_state(table_func_op->operator_id());
+        auto* tfl = dynamic_cast<TableFunctionLocalState*>(local_state);
+
+        tfl->_child_block = build_input();
+        tfl->_child_eos = true;
+
+        st = table_func_op->push(runtime_state.get(), tfl->_child_block.get(), 
tfl->_child_eos);
+        ASSERT_TRUE(st.ok()) << "push failed: " << st.to_string();
+
+        bool eos = false;
+        Block output_block;
+        st = table_func_op->pull(runtime_state.get(), &output_block, &eos);
+        ASSERT_TRUE(st.ok()) << "pull failed: " << st.to_string();
+
+        // Expected: 6 rows — NULL row (id=2) and empty row (id=4) are skipped
+        EXPECT_EQ(output_block.rows(), 6);
+
+        // Verify ids: [1,1,1,3,3,5]
+        auto id_col = output_block.get_by_position(0).column;
+        auto check_id = [&](size_t row, int32_t expected) {
+            auto field = (*id_col)[row];
+            EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+        };
+        check_id(0, 1);
+        check_id(1, 1);
+        check_id(2, 1);
+        check_id(3, 3);
+        check_id(4, 3);
+        check_id(5, 5);
+
+        // Verify tags: [10,20,30,40,50,60]
+        auto tag_col = output_block.get_by_position(2).column;
+        auto check_tag = [&](size_t row, int32_t expected) {
+            auto field = (*tag_col)[row];
+            EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+        };
+        check_tag(0, 10);
+        check_tag(1, 20);
+        check_tag(2, 30);
+        check_tag(3, 40);
+        check_tag(4, 50);
+        check_tag(5, 60);
+    }
+}
+
+// Test outer mode fast path with NULL and empty arrays (should emit NULL rows)
+TEST_F(UnnestTest, outer_with_nulls_fast_path) {
+    TDescriptorTable desc_table;
+    desc_table.tupleDescriptors.push_back(tuple0);
+    desc_table.tupleDescriptors.push_back(tuple1);
+    desc_table.tupleDescriptors.push_back(tuple2);
+    desc_table.slotDescriptors.push_back(slot_desc_id);
+    desc_table.slotDescriptors.push_back(slot_desc_tags);
+    desc_table.slotDescriptors.push_back(slot_desc_unnest_tag);
+    desc_table.slotDescriptors.push_back(slot_desc_id_2);
+    desc_table.slotDescriptors.push_back(slot_desc_unnest_tag2);
+
+    setup_exec_env();
+    runtime_state->_batch_size = 4096;
+
+    TPlanNode tplan_node;
+    setup_explode_plan_node(tplan_node, true); // outer, no conjuncts → fast 
path
+
+    DescriptorTbl* desc_tbl;
+    auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+    EXPECT_TRUE(st.ok());
+
+    DataTypePtr data_type_int(std::make_shared<DataTypeInt32>());
+    auto data_type_int_nullable = make_nullable(data_type_int);
+    DataTypePtr 
data_type_array_type(std::make_shared<DataTypeArray>(data_type_int_nullable));
+    auto data_type_array_type_nullable = make_nullable(data_type_array_type);
+
+    // Build input: ids=[1,2,3,4,5], arrays=[[10,20], NULL, [30], [], [40,50]]
+    auto build_input = [&]() {
+        auto result_block = std::make_unique<Block>();
+
+        auto id_column = ColumnInt32::create();
+        for (int32_t id : {1, 2, 3, 4, 5}) {
+            id_column->insert_data((const char*)(&id), 0);
+        }
+        
result_block->insert(ColumnWithTypeAndName(make_nullable(std::move(id_column)),
+                                                   data_type_int_nullable, 
"id"));
+
+        auto arr_data = ColumnInt32::create();
+        auto arr_offsets = ColumnOffset64::create();
+        auto arr_nullmap = ColumnUInt8::create();
+
+        // Row 0: [10, 20]
+        for (int32_t v : {10, 20}) {
+            arr_data->insert_data((const char*)(&v), 0);
+        }
+        ColumnArray::Offset64 off = 2;
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        // Row 1: NULL
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(1);
+
+        // Row 2: [30]
+        int32_t v30 = 30;
+        arr_data->insert_data((const char*)(&v30), 0);
+        off = 3;
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        // Row 3: []
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        // Row 4: [40, 50]
+        for (int32_t v : {40, 50}) {
+            arr_data->insert_data((const char*)(&v), 0);
+        }
+        off = 5;
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        auto array_column =
+                ColumnArray::create(make_nullable(std::move(arr_data)), 
std::move(arr_offsets));
+        auto nullable_array =
+                ColumnNullable::create(std::move(array_column), 
std::move(arr_nullmap));
+        result_block->insert(ColumnWithTypeAndName(std::move(nullable_array),
+                                                   
data_type_array_type_nullable, "tags"));
+        return result_block;
+    };
+
+    {
+        auto table_func_op = create_test_operators(desc_tbl, tplan_node);
+        auto* local_state = 
runtime_state->get_local_state(table_func_op->operator_id());
+        auto* tfl = dynamic_cast<TableFunctionLocalState*>(local_state);
+
+        tfl->_child_block = build_input();
+        tfl->_child_eos = true;
+
+        st = table_func_op->push(runtime_state.get(), tfl->_child_block.get(), 
tfl->_child_eos);
+        ASSERT_TRUE(st.ok()) << "push failed: " << st.to_string();
+
+        bool eos = false;
+        Block output_block;
+        st = table_func_op->pull(runtime_state.get(), &output_block, &eos);
+        ASSERT_TRUE(st.ok()) << "pull failed: " << st.to_string();
+
+        // Expected: 7 rows
+        // (1,10), (1,20), (2,NULL), (3,30), (4,NULL), (5,40), (5,50)
+        EXPECT_EQ(output_block.rows(), 7);
+
+        // Verify ids: [1,1,2,3,4,5,5]
+        auto id_col = output_block.get_by_position(0).column;
+        auto check_id = [&](size_t row, int32_t expected) {
+            auto field = (*id_col)[row];
+            EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+        };
+        check_id(0, 1);
+        check_id(1, 1);
+        check_id(2, 2);
+        check_id(3, 3);
+        check_id(4, 4);
+        check_id(5, 5);
+        check_id(6, 5);
+
+        // Verify tags: [10, 20, NULL, 30, NULL, 40, 50]
+        auto tag_col = output_block.get_by_position(2).column;
+        auto check_tag = [&](size_t row, int32_t expected) {
+            auto field = (*tag_col)[row];
+            EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+        };
+        auto check_null = [&](size_t row) {
+            EXPECT_TRUE(tag_col->is_null_at(row)) << "row " << row << " should 
be null";
+        };
+        check_tag(0, 10);
+        check_tag(1, 20);
+        check_null(2);
+        check_tag(3, 30);
+        check_null(4);
+        check_tag(5, 40);
+        check_tag(6, 50);
+    }
+}
+// Test posexplode inner mode fast path with NULL and empty arrays
+TEST_F(UnnestTest, posexplode_with_nulls_fast_path) {
+    // Build struct type descriptor for posexplode output:
+    // STRUCT(pos: INT NOT NULL, val: NULLABLE<INT>)
+    TTypeNode type_node_struct;
+    type_node_struct.type = TTypeNodeType::STRUCT;
+    {
+        TStructField sf_pos;
+        sf_pos.__set_name("pos");
+        sf_pos.__set_contains_null(false);
+        TStructField sf_val;
+        sf_val.__set_name("val");
+        sf_val.__set_contains_null(true);
+        type_node_struct.__set_struct_fields({sf_pos, sf_val});
+    }
+
+    TTypeDesc type_desc_struct;
+    type_desc_struct.types.emplace_back(type_node_struct);
+    type_desc_struct.types.emplace_back(type_node_int); // pos sub-type
+    type_desc_struct.types.emplace_back(type_node_int); // val sub-type
+    type_desc_struct.byte_size = -1;
+
+    // Create new tuple/slot descriptors for posexplode using continuing IDs
+    // After constructor: slot_id=5, tuple_id=3, col_unique_id=3
+    TTupleDescriptor pe_tuple_tf;
+    pe_tuple_tf.id = tuple_id++;
+    pe_tuple_tf.byteSize = 0;
+    pe_tuple_tf.numNullBytes = 0;
+
+    TTupleDescriptor pe_tuple_proj;
+    pe_tuple_proj.id = tuple_id++;
+    pe_tuple_proj.byteSize = 0;
+    pe_tuple_proj.numNullBytes = 0;
+
+    // TF output slot: struct type
+    TSlotRef pe_slot_ref_out;
+    pe_slot_ref_out.slot_id = slot_id++;
+    pe_slot_ref_out.tuple_id = pe_tuple_tf.id;
+    pe_slot_ref_out.col_unique_id = -1;
+    pe_slot_ref_out.is_virtual_slot = false;
+
+    TSlotDescriptor pe_slot_desc_out;
+    pe_slot_desc_out.id = pe_slot_ref_out.slot_id;
+    pe_slot_desc_out.parent = pe_tuple_tf.id;
+    pe_slot_desc_out.slotType = type_desc_struct;
+    pe_slot_desc_out.columnPos = -1;
+    pe_slot_desc_out.byteOffset = 0;
+    pe_slot_desc_out.nullIndicatorByte = 0;
+    pe_slot_desc_out.nullIndicatorBit = 0;
+    pe_slot_desc_out.colName = "posexplode#3";
+    pe_slot_desc_out.slotIdx = -1;
+    pe_slot_desc_out.isMaterialized = true;
+    pe_slot_desc_out.col_unique_id = -1;
+    pe_slot_desc_out.is_key = false;
+    pe_slot_desc_out.need_materialize = true;
+    pe_slot_desc_out.is_auto_increment = false;
+    pe_slot_desc_out.primitive_type = TPrimitiveType::STRUCT;
+
+    // Projection slots
+    TSlotDescriptor pe_slot_desc_id_proj = slot_desc_id;
+    pe_slot_desc_id_proj.id = slot_id++;
+    pe_slot_desc_id_proj.parent = pe_tuple_proj.id;
+
+    TSlotDescriptor pe_slot_desc_out_proj = pe_slot_desc_out;
+    pe_slot_desc_out_proj.id = slot_id++;
+    pe_slot_desc_out_proj.parent = pe_tuple_proj.id;
+
+    TDescriptorTable desc_table;
+    desc_table.tupleDescriptors.push_back(tuple0);
+    desc_table.tupleDescriptors.push_back(pe_tuple_tf);
+    desc_table.tupleDescriptors.push_back(pe_tuple_proj);
+    desc_table.slotDescriptors.push_back(slot_desc_id);
+    desc_table.slotDescriptors.push_back(slot_desc_tags);
+    desc_table.slotDescriptors.push_back(pe_slot_desc_out);
+    desc_table.slotDescriptors.push_back(pe_slot_desc_id_proj);
+    desc_table.slotDescriptors.push_back(pe_slot_desc_out_proj);
+
+    setup_exec_env();
+    runtime_state->_batch_size = 4096;
+
+    // Build plan node for posexplode (inner mode, no conjuncts → fast path)
+    TPlanNode tplan_node;
+    {
+        tplan_node.node_id = 0;
+        tplan_node.node_type = TPlanNodeType::TABLE_FUNCTION_NODE;
+        tplan_node.num_children = 1;
+        tplan_node.limit = -1;
+
+        tplan_node.row_tuples.push_back(tuple0.id);
+        tplan_node.row_tuples.push_back(pe_tuple_tf.id);
+        tplan_node.nullable_tuples.push_back(false);
+        tplan_node.nullable_tuples.push_back(false);
+        tplan_node.compact_data = false;
+        tplan_node.is_serial_operator = false;
+
+        tplan_node.__set_table_function_node(TTableFunctionNode());
+
+        TExpr fn_expr;
+        fn_expr.nodes.emplace_back();
+        fn_expr.nodes[0].node_type = TExprNodeType::FUNCTION_CALL;
+        fn_expr.nodes[0].type.types.emplace_back(type_node_struct);
+        fn_expr.nodes[0].type.types.emplace_back(type_node_int);
+        fn_expr.nodes[0].type.types.emplace_back(type_node_int);
+        fn_expr.nodes[0].num_children = 1;
+
+        fn_expr.nodes[0].__set_fn(TFunction());
+        fn_expr.nodes[0].fn.__set_name(TFunctionName());
+        fn_expr.nodes[0].fn.name.function_name = "posexplode";
+        fn_expr.nodes[0].fn.arg_types.emplace_back(type_desc_array_int);
+        fn_expr.nodes[0].fn.ret_type = type_desc_struct;
+        fn_expr.nodes[0].fn.has_var_args = false;
+        fn_expr.nodes[0].fn.signature = "posexplode(array<int>)";
+        fn_expr.nodes[0].fn.__set_scalar_fn(TScalarFunction());
+        fn_expr.nodes[0].fn.scalar_fn.symbol = "";
+        fn_expr.nodes[0].fn.id = 0;
+        fn_expr.nodes[0].fn.vectorized = true;
+        fn_expr.nodes[0].fn.is_udtf_function = false;
+        fn_expr.nodes[0].fn.is_static_load = false;
+        fn_expr.nodes[0].fn.expiration_time = 360;
+        fn_expr.nodes[0].is_nullable = true;
+
+        fn_expr.nodes.emplace_back();
+        fn_expr.nodes[1].node_type = TExprNodeType::SLOT_REF;
+        fn_expr.nodes[1].type = type_desc_array_int;
+        fn_expr.nodes[1].num_children = 0;
+        fn_expr.nodes[1].__set_slot_ref(TSlotRef());
+        fn_expr.nodes[1].slot_ref = slot_ref_tags;
+        fn_expr.nodes[1].output_scale = -1;
+        fn_expr.nodes[1].is_nullable = true;
+        fn_expr.nodes[1].label = "tags";
+
+        tplan_node.table_function_node.fnCallExprList.push_back(fn_expr);
+
+        
tplan_node.table_function_node.outputSlotIds.push_back(slot_desc_id.id);
+        
tplan_node.table_function_node.outputSlotIds.push_back(pe_slot_desc_out.id);
+
+        // Projections
+        TExpr texpr_proj0;
+        texpr_proj0.nodes.emplace_back();
+        texpr_proj0.nodes[0].node_type = TExprNodeType::SLOT_REF;
+        texpr_proj0.nodes[0].type = type_desc_int;
+        texpr_proj0.nodes[0].num_children = 0;
+        texpr_proj0.nodes[0].__set_slot_ref(TSlotRef());
+        texpr_proj0.nodes[0].slot_ref = slot_ref_id;
+        texpr_proj0.nodes[0].output_scale = -1;
+        texpr_proj0.nodes[0].is_nullable = true;
+        texpr_proj0.nodes[0].label = "id";
+
+        TExpr texpr_proj1;
+        texpr_proj1.nodes.emplace_back();
+        texpr_proj1.nodes[0].node_type = TExprNodeType::SLOT_REF;
+        texpr_proj1.nodes[0].type = type_desc_struct;
+        texpr_proj1.nodes[0].num_children = 0;
+        texpr_proj1.nodes[0].__set_slot_ref(TSlotRef());
+        texpr_proj1.nodes[0].slot_ref = pe_slot_ref_out;
+        texpr_proj1.nodes[0].output_scale = -1;
+        texpr_proj1.nodes[0].is_nullable = true;
+        texpr_proj1.nodes[0].label = "posexplode";
+
+        tplan_node.projections.push_back(texpr_proj0);
+        tplan_node.projections.push_back(texpr_proj1);
+        tplan_node.output_tuple_id = pe_tuple_proj.id;
+        tplan_node.nereids_id = 144;
+    }
+
+    DescriptorTbl* desc_tbl;
+    auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+    EXPECT_TRUE(st.ok());
+
+    DataTypePtr data_type_int(std::make_shared<DataTypeInt32>());
+    auto data_type_int_nullable = make_nullable(data_type_int);
+    DataTypePtr 
data_type_array_type(std::make_shared<DataTypeArray>(data_type_int_nullable));
+    auto data_type_array_type_nullable = make_nullable(data_type_array_type);
+
+    // Build input: ids=[1,2,3,4], arrays=[[10,20], NULL, [30], []]
+    auto build_input = [&]() {
+        auto result_block = std::make_unique<Block>();
+
+        auto id_column = ColumnInt32::create();
+        for (int32_t id : {1, 2, 3, 4}) {
+            id_column->insert_data((const char*)(&id), 0);
+        }
+        
result_block->insert(ColumnWithTypeAndName(make_nullable(std::move(id_column)),
+                                                   data_type_int_nullable, 
"id"));
+
+        auto arr_data = ColumnInt32::create();
+        auto arr_offsets = ColumnOffset64::create();
+        auto arr_nullmap = ColumnUInt8::create();
+
+        // Row 0: [10, 20]
+        for (int32_t v : {10, 20}) {
+            arr_data->insert_data((const char*)(&v), 0);
+        }
+        ColumnArray::Offset64 off = 2;
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        // Row 1: NULL
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(1);
+
+        // Row 2: [30]
+        int32_t v30 = 30;
+        arr_data->insert_data((const char*)(&v30), 0);
+        off = 3;
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        // Row 3: []
+        arr_offsets->insert_data((const char*)(&off), 0);
+        arr_nullmap->get_data().push_back(0);
+
+        auto array_column =
+                ColumnArray::create(make_nullable(std::move(arr_data)), 
std::move(arr_offsets));
+        auto nullable_array =
+                ColumnNullable::create(std::move(array_column), 
std::move(arr_nullmap));
+        result_block->insert(ColumnWithTypeAndName(std::move(nullable_array),
+                                                   
data_type_array_type_nullable, "tags"));
+        return result_block;
+    };
+
+    {
+        auto table_func_op = create_test_operators(desc_tbl, tplan_node);
+        auto* local_state = 
runtime_state->get_local_state(table_func_op->operator_id());
+        auto* tfl = dynamic_cast<TableFunctionLocalState*>(local_state);
+
+        tfl->_child_block = build_input();
+        tfl->_child_eos = true;
+
+        st = table_func_op->push(runtime_state.get(), tfl->_child_block.get(), 
tfl->_child_eos);
+        ASSERT_TRUE(st.ok()) << "push failed: " << st.to_string();
+
+        bool eos = false;
+        Block output_block;
+        st = table_func_op->pull(runtime_state.get(), &output_block, &eos);
+        ASSERT_TRUE(st.ok()) << "pull failed: " << st.to_string();
+
+        // Expected: 3 rows (NULL and empty arrays are skipped in inner mode)
+        // Row 0: id=1, struct=(pos=0, val=10)
+        // Row 1: id=1, struct=(pos=1, val=20)
+        // Row 2: id=3, struct=(pos=0, val=30)
+        EXPECT_EQ(output_block.rows(), 3);
+
+        // Verify ids: [1, 1, 3]
+        auto id_col = output_block.get_by_position(0).column;
+        auto check_id = [&](size_t row, int32_t expected) {
+            auto field = (*id_col)[row];
+            EXPECT_EQ(field.get<TYPE_INT>(), expected) << "row " << row;
+        };
+        check_id(0, 1);
+        check_id(1, 1);
+        check_id(2, 3);
+
+        // Verify struct output column at position 2
+        auto out_col = output_block.get_by_position(2).column;
+        auto* nullable_out = assert_cast<const ColumnNullable*>(out_col.get());
+        for (size_t i = 0; i < 3; ++i) {
+            EXPECT_FALSE(nullable_out->is_null_at(i))
+                    << "struct row " << i << " should not be null";
+        }
+        auto* struct_col = assert_cast<const 
ColumnStruct*>(&nullable_out->get_nested_column());
+        auto* pos_col = assert_cast<const 
ColumnInt32*>(&struct_col->get_column(0));
+        auto* val_nullable = assert_cast<const 
ColumnNullable*>(&struct_col->get_column(1));
+        auto* val_col = assert_cast<const 
ColumnInt32*>(&val_nullable->get_nested_column());
+
+        // Verify positions: [0, 1, 0]
+        EXPECT_EQ(pos_col->get_element(0), 0);
+        EXPECT_EQ(pos_col->get_element(1), 1);
+        EXPECT_EQ(pos_col->get_element(2), 0);
+
+        // Verify values: [10, 20, 30]
+        EXPECT_EQ(val_col->get_element(0), 10);
+        EXPECT_EQ(val_col->get_element(1), 20);
+        EXPECT_EQ(val_col->get_element(2), 30);
+    }
+}
 } // namespace doris
diff --git a/be/test/exec/operator/union_operator_test.cpp 
b/be/test/exec/operator/union_operator_test.cpp
index f57c34d1be3..3009e977a47 100644
--- a/be/test/exec/operator/union_operator_test.cpp
+++ b/be/test/exec/operator/union_operator_test.cpp
@@ -51,7 +51,7 @@ struct MockUnionSinkOperator : public UnionSinkOperatorX {
 struct UnionOperatorTest : public ::testing::Test {
     void SetUp() override {
         state = std::make_shared<MockRuntimeState>();
-        state->batsh_size = 10;
+        state->_batch_size = 10;
         for (int i = 0; i < child_size; i++) {
             sink_state.push_back(std::make_shared<MockRuntimeState>());
             sink_ops.push_back(nullptr);
@@ -75,7 +75,7 @@ struct UnionOperatorTest : public ::testing::Test {
 };
 
 TEST_F(UnionOperatorTest, test_all_const_expr) {
-    state->batsh_size = 2;
+    state->_batch_size = 2;
     source_op.reset(new MockUnionSourceOperator {
             0,
             {std::make_shared<DataTypeInt64>(), 
std::make_shared<DataTypeInt64>(),
@@ -254,7 +254,7 @@ TEST_F(UnionOperatorTest, test_sink_and_source) {
 
     {
         for (int i = 0; i < child_size; i++) {
-            sink_state[i]->batsh_size = 2;
+            sink_state[i]->_batch_size = 2;
             Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, 
{3, 4});
             EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false));
         }
diff --git a/be/test/exec/sink/arrow_result_block_buffer_test.cpp 
b/be/test/exec/sink/arrow_result_block_buffer_test.cpp
index a87a03d1542..46d7daf6c37 100644
--- a/be/test/exec/sink/arrow_result_block_buffer_test.cpp
+++ b/be/test/exec/sink/arrow_result_block_buffer_test.cpp
@@ -76,7 +76,7 @@ private:
 
 TEST_F(ArrowResultBlockBufferTest, TestArrowResultBlockBuffer) {
     MockRuntimeState state;
-    state.batsh_size = 1;
+    state._batch_size = 1;
     int buffer_size = 16;
     auto dep = Dependency::create_shared(0, 0, "Test", true);
     auto ins_id = TUniqueId();
@@ -203,7 +203,7 @@ TEST_F(ArrowResultBlockBufferTest, 
TestArrowResultBlockBuffer) {
 
 TEST_F(ArrowResultBlockBufferTest, TestCancelArrowResultBlockBuffer) {
     MockRuntimeState state;
-    state.batsh_size = 1;
+    state._batch_size = 1;
     int buffer_size = 16;
     auto dep = Dependency::create_shared(0, 0, "Test", true);
     auto ins_id = TUniqueId();
@@ -277,7 +277,7 @@ TEST_F(ArrowResultBlockBufferTest, 
TestCancelArrowResultBlockBuffer) {
 
 TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
     MockRuntimeState state;
-    state.batsh_size = 1;
+    state._batch_size = 1;
     int buffer_size = 16;
     auto dep = Dependency::create_shared(0, 0, "Test", true);
     auto ins_id = TUniqueId();
@@ -340,7 +340,7 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
 
 TEST_F(ArrowResultBlockBufferTest, TestArrowResultSerializeFailure) {
     MockRuntimeState state;
-    state.batsh_size = 1;
+    state._batch_size = 1;
     int buffer_size = 16;
     auto dep = Dependency::create_shared(0, 0, "Test", true);
     auto ins_id = TUniqueId();
diff --git a/be/test/exec/sink/result_block_buffer_test.cpp 
b/be/test/exec/sink/result_block_buffer_test.cpp
index 46ea26c3d79..065472cd63e 100644
--- a/be/test/exec/sink/result_block_buffer_test.cpp
+++ b/be/test/exec/sink/result_block_buffer_test.cpp
@@ -63,7 +63,7 @@ private:
 
 TEST_F(MysqlResultBlockBufferTest, TestMySQLResultBlockBuffer) {
     MockRuntimeState state;
-    state.batsh_size = 1;
+    state._batch_size = 1;
     int buffer_size = 16;
     auto dep = Dependency::create_shared(0, 0, "Test", true);
     auto ins_id = TUniqueId();
@@ -190,7 +190,7 @@ TEST_F(MysqlResultBlockBufferTest, 
TestMySQLResultBlockBuffer) {
 
 TEST_F(MysqlResultBlockBufferTest, TestCancelMySQLResultBlockBuffer) {
     MockRuntimeState state;
-    state.batsh_size = 1;
+    state._batch_size = 1;
     int buffer_size = 16;
     auto dep = Dependency::create_shared(0, 0, "Test", true);
     auto ins_id = TUniqueId();
@@ -265,7 +265,7 @@ TEST_F(MysqlResultBlockBufferTest, 
TestCancelMySQLResultBlockBuffer) {
 
 TEST_F(MysqlResultBlockBufferTest, TestErrorClose) {
     MockRuntimeState state;
-    state.batsh_size = 1;
+    state._batch_size = 1;
     int buffer_size = 16;
     auto dep = Dependency::create_shared(0, 0, "Test", true);
     auto ins_id = TUniqueId();
diff --git a/be/test/testutil/mock/mock_runtime_state.h 
b/be/test/testutil/mock/mock_runtime_state.h
index a9ea7eaf89f..f1dbc16a74b 100644
--- a/be/test/testutil/mock/mock_runtime_state.h
+++ b/be/test/testutil/mock/mock_runtime_state.h
@@ -54,7 +54,7 @@ public:
                      ExecEnv* exec_env, QueryContext* ctx)
             : RuntimeState(query_id, fragment_id, query_options, 
query_globals, exec_env, ctx) {}
 
-    int batch_size() const override { return batsh_size; }
+    int batch_size() const override { return _batch_size; }
 
     bool enable_shared_exchange_sink_buffer() const override {
         return _enable_shared_exchange_sink_buffer;
@@ -72,7 +72,7 @@ public:
     WorkloadGroupPtr workload_group() override { return _workload_group; }
 
     // default batch size
-    int batsh_size = 4096;
+    int _batch_size = 4096;
     bool _enable_shared_exchange_sink_buffer = true;
     bool _enable_share_hash_table_for_broadcast_join = true;
     std::shared_ptr<MockContext> _mock_context = 
std::make_shared<MockContext>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to