This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 664fbffcba [Enchancement](table-function) optimization for  vectorized 
table function (#17973)
664fbffcba is described below

commit 664fbffcbaf861d8ce8b1e9211c5dfc8c01bf130
Author: Pxl <[email protected]>
AuthorDate: Wed Mar 29 10:45:00 2023 +0800

    [Enchancement](table-function) optimization for  vectorized table function 
(#17973)
---
 be/src/vec/exec/vtable_function_node.cpp           | 75 ++++++++++-----------
 be/src/vec/exec/vtable_function_node.h             | 31 ++++++---
 be/src/vec/exprs/table_function/table_function.h   | 58 ++++++++--------
 .../exprs/table_function/table_function_factory.h  |  3 +-
 be/src/vec/exprs/table_function/vexplode.cpp       | 53 +++------------
 be/src/vec/exprs/table_function/vexplode.h         | 14 ++--
 .../vec/exprs/table_function/vexplode_bitmap.cpp   | 78 ++++++++--------------
 be/src/vec/exprs/table_function/vexplode_bitmap.h  | 10 +--
 .../exprs/table_function/vexplode_json_array.cpp   | 40 +++--------
 .../vec/exprs/table_function/vexplode_json_array.h | 35 ++++------
 .../vec/exprs/table_function/vexplode_numbers.cpp  | 74 +++++++++++---------
 be/src/vec/exprs/table_function/vexplode_numbers.h | 34 ++++++++--
 be/src/vec/exprs/table_function/vexplode_split.cpp | 40 +++--------
 be/src/vec/exprs/table_function/vexplode_split.h   |  6 +-
 be/test/vec/function/function_test_util.cpp        | 24 ++-----
 .../test_varchar_schema_change.groovy              | 12 ++--
 16 files changed, 249 insertions(+), 338 deletions(-)

diff --git a/be/src/vec/exec/vtable_function_node.cpp 
b/be/src/vec/exec/vtable_function_node.cpp
index 32cfee7234..39ffb4fad7 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -43,8 +43,6 @@ Status VTableFunctionNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
         _fns.push_back(fn);
     }
     _fn_num = _fns.size();
-    _fn_values.resize(_fn_num);
-    _fn_value_lengths.resize(_fn_num);
 
     // Prepare output slot ids
     RETURN_IF_ERROR(_prepare_output_slot_ids(tnode));
@@ -104,6 +102,14 @@ Status VTableFunctionNode::prepare(RuntimeState* state) {
         }
     }
 
+    for (size_t i = 0; i < _child_slots.size(); i++) {
+        if (_slot_need_copy(i)) {
+            _output_slot_indexs.push_back(i);
+        } else {
+            _useless_slot_indexs.push_back(i);
+        }
+    }
+
     _cur_child_offset = -1;
 
     return Status::OK();
@@ -121,7 +127,7 @@ Status VTableFunctionNode::get_next(RuntimeState* state, 
Block* block, bool* eos
         RETURN_IF_ERROR_AND_CHECK_SPAN(
                 child(0)->get_next_after_projects(
                         state, &_child_block, &_child_eos,
-                        std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
+                        std::bind((Status(ExecNode::*)(RuntimeState*, Block*, 
bool*)) &
                                           ExecNode::get_next,
                                   _children[0], std::placeholders::_1, 
std::placeholders::_2,
                                   std::placeholders::_3)),
@@ -133,11 +139,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state, 
Block* block, bool* eos
     return pull(state, block, eos);
 }
 
-Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* 
output_block, bool* eos) {
+Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* 
output_block,
+                                               bool* eos) {
     size_t column_size = _output_slots.size();
     bool mem_reuse = output_block->mem_reuse();
 
-    std::vector<vectorized::MutableColumnPtr> columns(column_size);
+    std::vector<MutableColumnPtr> columns(column_size);
     for (size_t i = 0; i < column_size; i++) {
         if (mem_reuse) {
             columns[i] = 
std::move(*output_block->get_by_position(i).column).mutate();
@@ -146,6 +153,12 @@ Status 
VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
         }
     }
 
+    for (int i = 0; i < _fn_num; i++) {
+        if (columns[i + _child_slots.size()]->is_nullable()) {
+            _fns[i]->set_nullable();
+        }
+    }
+
     while (columns[_child_slots.size()]->size() < state->batch_size()) {
         RETURN_IF_CANCELLED(state);
         RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while 
getting next batch."));
@@ -158,6 +171,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* 
state, Block* output
         while (columns[_child_slots.size()]->size() < state->batch_size()) {
             int idx = _find_last_fn_eos_idx();
             if (idx == 0 || skip_child_row) {
+                _copy_output_slots(columns);
                 // all table functions' results are exhausted, process next 
child row.
                 RETURN_IF_ERROR(_process_next_child_row());
                 if (_cur_child_offset == -1) {
@@ -175,41 +189,25 @@ Status 
VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
             if (skip_child_row = _is_inner_and_empty(); skip_child_row) {
                 continue;
             }
-
-            // get slots from every table function.
-            // notice that _fn_values[i] may be null if the table function has 
empty result set.
-            for (int i = 0; i < _fn_num; i++) {
-                RETURN_IF_ERROR(_fns[i]->get_value(&_fn_values[i]));
-                
RETURN_IF_ERROR(_fns[i]->get_value_length(&_fn_value_lengths[i]));
-            }
-
-            // The tuples order in parent row batch should be
-            //      child1, child2, tf1, tf2, ...
-
-            // 1. copy data from child_block.
-            for (int i = 0; i < _child_slots.size(); i++) {
-                if (!slot_need_copy(i)) {
-                    columns[i]->insert_default();
-                    continue;
+            if (_fn_num == 1) {
+                _current_row_insert_times += _fns[0]->get_value(
+                        columns[_child_slots.size()],
+                        state->batch_size() - 
columns[_child_slots.size()]->size());
+            } else {
+                for (int i = 0; i < _fn_num; i++) {
+                    _fns[i]->get_value(columns[i + _child_slots.size()]);
                 }
-                auto src_column = _child_block.get_by_position(i).column;
-                columns[i]->insert_from(*src_column, _cur_child_offset);
+                _current_row_insert_times++;
+                _fns[_fn_num - 1]->forward();
             }
+        }
+    }
 
-            // 2. copy function result
-            for (int i = 0; i < _fns.size(); i++) {
-                int output_slot_idx = i + _child_slots.size();
-                if (_fn_values[i] == nullptr) {
-                    columns[output_slot_idx]->insert_default();
-                } else {
-                    
columns[output_slot_idx]->insert_data(reinterpret_cast<char*>(_fn_values[i]),
-                                                          
_fn_value_lengths[i]);
-                }
-            }
+    _copy_output_slots(columns);
 
-            bool tmp = false;
-            _fns[_fn_num - 1]->forward(&tmp);
-        }
+    size_t row_size = columns[_child_slots.size()]->size();
+    for (auto index : _useless_slot_indexs) {
+        columns[index]->insert_many_defaults(row_size - 
columns[index]->size());
     }
 
     if (!columns.empty() && !columns[0]->empty()) {
@@ -292,11 +290,10 @@ int VTableFunctionNode::_find_last_fn_eos_idx() {
 //  If `last_eos_idx` is 1, which means f2 and f3 are eos.
 //  So we need to forward f1, and reset f2 and f3.
 bool VTableFunctionNode::_roll_table_functions(int last_eos_idx) {
-    bool fn_eos = false;
     int i = last_eos_idx - 1;
     for (; i >= 0; --i) {
-        _fns[i]->forward(&fn_eos);
-        if (!fn_eos) {
+        _fns[i]->forward();
+        if (!_fns[i]->eos()) {
             break;
         }
     }
diff --git a/be/src/vec/exec/vtable_function_node.h 
b/be/src/vec/exec/vtable_function_node.h
index 99d2394514..2aad138f4a 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -39,7 +39,7 @@ public:
     bool need_more_input_data() const { return !_child_block.rows() && 
!_child_eos; }
 
     void release_resource(doris::RuntimeState* state) override {
-        vectorized::VExpr::close(_vfn_ctxs, state);
+        VExpr::close(_vfn_ctxs, state);
 
         if (_num_rows_filtered_counter != nullptr) {
             COUNTER_SET(_num_rows_filtered_counter, 
static_cast<int64_t>(_num_rows_filtered));
@@ -47,7 +47,7 @@ public:
         ExecNode::release_resource(state);
     }
 
-    Status push(RuntimeState*, vectorized::Block* input_block, bool eos) 
override {
+    Status push(RuntimeState*, Block* input_block, bool eos) override {
         _child_eos = eos;
         if (input_block->rows() == 0) {
             return Status::OK();
@@ -60,8 +60,8 @@ public:
         return Status::OK();
     }
 
-    Status pull(RuntimeState* state, vectorized::Block* output_block, bool* 
eos) override {
-        RETURN_IF_ERROR(get_expanded_block(state, output_block, eos));
+    Status pull(RuntimeState* state, Block* output_block, bool* eos) override {
+        RETURN_IF_ERROR(_get_expanded_block(state, output_block, eos));
         reached_limit(output_block, eos);
         return Status::OK();
     }
@@ -97,26 +97,39 @@ private:
             1. FE: create a new output tuple based on the real output slots;
             2. BE: refractor (V)TableFunctionNode output rows based no the new 
tuple;
     */
-    inline bool slot_need_copy(SlotId slot_id) const {
+    inline bool _slot_need_copy(SlotId slot_id) const {
         auto id = _output_slots[slot_id]->id();
         return (id < _output_slot_ids.size()) && (_output_slot_ids[id]);
     }
 
-    Status get_expanded_block(RuntimeState* state, Block* output_block, bool* 
eos);
+    Status _get_expanded_block(RuntimeState* state, Block* output_block, bool* 
eos);
+
+    void _copy_output_slots(std::vector<MutableColumnPtr>& columns) {
+        if (!_current_row_insert_times) {
+            return;
+        }
+        for (auto index : _output_slot_indexs) {
+            auto src_column = _child_block.get_by_position(index).column;
+            columns[index]->insert_many_from(*src_column, _cur_child_offset,
+                                             _current_row_insert_times);
+        }
+        _current_row_insert_times = 0;
+    }
+    int _current_row_insert_times = 0;
 
     Block _child_block;
     std::vector<SlotDescriptor*> _child_slots;
     std::vector<SlotDescriptor*> _output_slots;
     int64_t _cur_child_offset = 0;
 
-    std::vector<vectorized::VExprContext*> _vfn_ctxs;
+    std::vector<VExprContext*> _vfn_ctxs;
 
     std::vector<TableFunction*> _fns;
-    std::vector<void*> _fn_values;
-    std::vector<int64_t> _fn_value_lengths;
     int _fn_num = 0;
 
     std::vector<bool> _output_slot_ids;
+    std::vector<int> _output_slot_indexs;
+    std::vector<int> _useless_slot_indexs;
 
     std::vector<int> _child_slot_sizes;
     // indicate if child node reach the end
diff --git a/be/src/vec/exprs/table_function/table_function.h 
b/be/src/vec/exprs/table_function/table_function.h
index 68e6829df5..7fff88899b 100644
--- a/be/src/vec/exprs/table_function/table_function.h
+++ b/be/src/vec/exprs/table_function/table_function.h
@@ -30,44 +30,49 @@ constexpr auto COMBINATOR_SUFFIX_OUTER = "_outer";
 
 class TableFunction {
 public:
-    virtual ~TableFunction() {}
+    virtual ~TableFunction() = default;
 
     virtual Status prepare() { return Status::OK(); }
 
     virtual Status open() { return Status::OK(); }
 
-    // only used for vectorized.
-    virtual Status process_init(vectorized::Block* block) = 0;
+    virtual Status process_init(Block* block) = 0;
 
-    // only used for vectorized.
-    virtual Status process_row(size_t row_idx) = 0;
+    virtual Status process_row(size_t row_idx) {
+        _cur_size = 0;
+        return reset();
+    }
 
     // only used for vectorized.
     virtual Status process_close() = 0;
 
-    virtual Status reset() = 0;
+    virtual Status reset() {
+        _eos = false;
+        _cur_offset = 0;
+        return Status::OK();
+    }
 
-    virtual Status get_value(void** output) = 0;
+    virtual void get_value(MutableColumnPtr& column) = 0;
 
-    // only used for vectorized.
-    virtual Status get_value_length(int64_t* length) {
-        *length = -1;
-        return Status::OK();
+    virtual int get_value(MutableColumnPtr& column, int max_step) {
+        max_step = std::max(1, std::min(max_step, (int)(_cur_size - 
_cur_offset)));
+        int i = 0;
+        for (; i < max_step && !eos(); i++) {
+            get_value(column);
+            forward();
+        }
+        return i;
     }
 
     virtual Status close() { return Status::OK(); }
 
-    virtual Status forward(bool* eos) {
-        if (_is_current_empty) {
-            *eos = true;
+    virtual Status forward(int step = 1) {
+        if (current_empty()) {
             _eos = true;
         } else {
-            ++_cur_offset;
-            if (_cur_offset == _cur_size) {
-                *eos = true;
+            _cur_offset += step;
+            if (_cur_offset >= _cur_size) {
                 _eos = true;
-            } else {
-                *eos = false;
             }
         }
         return Status::OK();
@@ -76,9 +81,8 @@ public:
     std::string name() const { return _fn_name; }
     bool eos() const { return _eos; }
 
-    void set_vexpr_context(vectorized::VExprContext* vexpr_context) {
-        _vexpr_context = vexpr_context;
-    }
+    void set_vexpr_context(VExprContext* vexpr_context) { _vexpr_context = 
vexpr_context; }
+    void set_nullable() { _is_nullable = true; }
 
     bool is_outer() const { return _is_outer; }
     void set_outer() {
@@ -89,21 +93,21 @@ public:
         _fn_name += COMBINATOR_SUFFIX_OUTER;
     }
 
-    bool current_empty() const { return _is_current_empty; }
+    bool current_empty() const { return _cur_size == 0; }
 
 protected:
     std::string _fn_name;
-    vectorized::VExprContext* _vexpr_context = nullptr;
+    VExprContext* _vexpr_context = nullptr;
     // true if there is no more data can be read from this function.
     bool _eos = false;
-    // true means the function result set from current row is empty(eg, source 
value is null or empty).
-    // so that when calling reset(), we can do nothing and keep eos as false.
-    bool _is_current_empty = false;
     // the position of current cursor
     int64_t _cur_offset = 0;
     // the size of current result
     int64_t _cur_size = 0;
     // set _is_outer to false for explode function, and should not return 
tuple while array is null or empty
     bool _is_outer = false;
+
+    bool _is_nullable = false;
+    bool _is_const = false;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/table_function_factory.h 
b/be/src/vec/exprs/table_function/table_function_factory.h
index 456dba0ddc..d299af0a9c 100644
--- a/be/src/vec/exprs/table_function/table_function_factory.h
+++ b/be/src/vec/exprs/table_function/table_function_factory.h
@@ -36,8 +36,7 @@ namespace vectorized {
 class TableFunction;
 class TableFunctionFactory {
 public:
-    TableFunctionFactory() {}
-    ~TableFunctionFactory() {}
+    TableFunctionFactory() = delete;
     static Status get_fn(const std::string& fn_name_raw, ObjectPool* pool, 
TableFunction** fn);
 
     const static std::unordered_map<std::string, 
std::function<TableFunction*()>> _function_map;
diff --git a/be/src/vec/exprs/table_function/vexplode.cpp 
b/be/src/vec/exprs/table_function/vexplode.cpp
index aa0249916a..b2c93196a8 100644
--- a/be/src/vec/exprs/table_function/vexplode.cpp
+++ b/be/src/vec/exprs/table_function/vexplode.cpp
@@ -26,7 +26,7 @@ VExplodeTableFunction::VExplodeTableFunction() {
     _fn_name = "vexplode";
 }
 
-Status VExplodeTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeTableFunction::process_init(Block* block) {
     CHECK(_vexpr_context->root()->children().size() == 1)
             << "VExplodeTableFunction only support 1 child but has "
             << _vexpr_context->root()->children().size();
@@ -48,17 +48,12 @@ Status 
VExplodeTableFunction::process_init(vectorized::Block* block) {
 
 Status VExplodeTableFunction::process_row(size_t row_idx) {
     DCHECK(row_idx < _array_column->size());
-    _is_current_empty = false;
-    _eos = false;
-    _cur_offset = 0;
-    _array_offset = (*_detail.offsets_ptr)[row_idx - 1];
-    _cur_size = (*_detail.offsets_ptr)[row_idx] - _array_offset;
+    RETURN_IF_ERROR(TableFunction::process_row(row_idx));
 
-    // array is NULL, or array is empty
-    if (_cur_size == 0 || (_detail.array_nullmap_data && 
_detail.array_nullmap_data[row_idx])) {
-        _is_current_empty = true;
+    if (!_detail.array_nullmap_data || !_detail.array_nullmap_data[row_idx]) {
+        _array_offset = (*_detail.offsets_ptr)[row_idx - 1];
+        _cur_size = (*_detail.offsets_ptr)[row_idx] - _array_offset;
     }
-
     return Status::OK();
 }
 
@@ -69,42 +64,14 @@ Status VExplodeTableFunction::process_close() {
     return Status::OK();
 }
 
-Status VExplodeTableFunction::reset() {
-    _eos = false;
-    _cur_offset = 0;
-    return Status::OK();
-}
-
-Status VExplodeTableFunction::get_value(void** output) {
-    if (_is_current_empty) {
-        *output = nullptr;
-        return Status::OK();
-    }
-
+void VExplodeTableFunction::get_value(MutableColumnPtr& column) {
     size_t pos = _array_offset + _cur_offset;
-    if (_detail.nested_nullmap_data && _detail.nested_nullmap_data[pos]) {
-        *output = nullptr;
+    if (current_empty() || (_detail.nested_nullmap_data && 
_detail.nested_nullmap_data[pos])) {
+        column->insert_default();
     } else {
-        *output = const_cast<char*>(_detail.nested_col->get_data_at(pos).data);
+        
column->insert_data(const_cast<char*>(_detail.nested_col->get_data_at(pos).data),
+                            _detail.nested_col->get_data_at(pos).size);
     }
-
-    return Status::OK();
-}
-
-Status VExplodeTableFunction::get_value_length(int64_t* length) {
-    if (_is_current_empty) {
-        *length = -1;
-        return Status::OK();
-    }
-
-    size_t pos = _array_offset + _cur_offset;
-    if (_detail.nested_nullmap_data && _detail.nested_nullmap_data[pos]) {
-        *length = 0;
-    } else {
-        *length = _detail.nested_col->get_data_at(pos).size;
-    }
-
-    return Status::OK();
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode.h 
b/be/src/vec/exprs/table_function/vexplode.h
index 3bc8ba9ef0..8911c8f225 100644
--- a/be/src/vec/exprs/table_function/vexplode.h
+++ b/be/src/vec/exprs/table_function/vexplode.h
@@ -30,14 +30,12 @@ class VExplodeTableFunction : public TableFunction {
 public:
     VExplodeTableFunction();
 
-    virtual ~VExplodeTableFunction() = default;
-
-    virtual Status process_init(vectorized::Block* block) override;
-    virtual Status process_row(size_t row_idx) override;
-    virtual Status process_close() override;
-    virtual Status reset() override;
-    virtual Status get_value(void** output) override;
-    virtual Status get_value_length(int64_t* length) override;
+    ~VExplodeTableFunction() override = default;
+
+    Status process_init(Block* block) override;
+    Status process_row(size_t row_idx) override;
+    Status process_close() override;
+    void get_value(MutableColumnPtr& column) override;
 
 private:
     ColumnPtr _array_column;
diff --git a/be/src/vec/exprs/table_function/vexplode_bitmap.cpp 
b/be/src/vec/exprs/table_function/vexplode_bitmap.cpp
index f680922e7c..8dabbac8c3 100644
--- a/be/src/vec/exprs/table_function/vexplode_bitmap.cpp
+++ b/be/src/vec/exprs/table_function/vexplode_bitmap.cpp
@@ -19,6 +19,8 @@
 
 #include "common/status.h"
 #include "util/bitmap_value.h"
+#include "vec/columns/columns_number.h"
+#include "vec/exprs/table_function/table_function.h"
 #include "vec/exprs/vexpr.h"
 
 namespace doris::vectorized {
@@ -27,7 +29,7 @@ VExplodeBitmapTableFunction::VExplodeBitmapTableFunction() {
     _fn_name = "vexplode_bitmap";
 }
 
-Status VExplodeBitmapTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeBitmapTableFunction::process_init(Block* block) {
     CHECK(_vexpr_context->root()->children().size() == 1)
             << "VExplodeNumbersTableFunction must be have 1 children but have "
             << _vexpr_context->root()->children().size();
@@ -42,68 +44,53 @@ Status 
VExplodeBitmapTableFunction::process_init(vectorized::Block* block) {
 
 Status VExplodeBitmapTableFunction::reset() {
     _eos = false;
-    if (!_is_current_empty) {
-        _reset_iterator();
+    _cur_offset = 0;
+    if (!current_empty()) {
+        _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap));
     }
     return Status::OK();
 }
 
-Status VExplodeBitmapTableFunction::forward(bool* eos) {
-    if (_is_current_empty) {
-        *eos = true;
-        _eos = true;
-    } else {
-        ++(*_cur_iter);
-        ++_cur_offset;
-        if (_cur_offset == _cur_size) {
-            *eos = true;
-            _eos = true;
-        } else {
-            _cur_value = **_cur_iter;
-            *eos = false;
+Status VExplodeBitmapTableFunction::forward(int step) {
+    if (!current_empty()) {
+        for (int i = 0; i < step; i++) {
+            ++(*_cur_iter);
         }
     }
-    return Status::OK();
+    return TableFunction::forward(step);
 }
 
-Status VExplodeBitmapTableFunction::get_value(void** output) {
-    if (_is_current_empty) {
-        *output = nullptr;
+void VExplodeBitmapTableFunction::get_value(MutableColumnPtr& column) {
+    if (current_empty()) {
+        column->insert_default();
     } else {
-        *output = &_cur_value;
+        if (_is_nullable) {
+            static_cast<ColumnInt64*>(
+                    
static_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
+                    ->insert_value(**_cur_iter);
+            static_cast<ColumnUInt8*>(
+                    
static_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+                    ->insert_default();
+        } else {
+            static_cast<ColumnInt64*>(column.get())->insert_value(**_cur_iter);
+        }
     }
-    return Status::OK();
-}
-
-void VExplodeBitmapTableFunction::_reset_iterator() {
-    DCHECK(_cur_bitmap->cardinality() > 0) << _cur_bitmap->cardinality();
-    _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap));
-    _cur_value = **_cur_iter;
-    _cur_offset = 0;
 }
 
 Status VExplodeBitmapTableFunction::process_row(size_t row_idx) {
-    _eos = false;
-    _is_current_empty = false;
-    _cur_size = 0;
-    _cur_offset = 0;
+    RETURN_IF_ERROR(TableFunction::process_row(row_idx));
 
     StringRef value = _value_column->get_data_at(row_idx);
 
-    if (value.data == nullptr) {
-        _is_current_empty = true;
-    } else {
+    if (value.data) {
         _cur_bitmap = reinterpret_cast<const BitmapValue*>(value.data);
 
         _cur_size = _cur_bitmap->cardinality();
-        if (_cur_size == 0) {
-            _is_current_empty = true;
-        } else {
-            _reset_iterator();
+        if (!current_empty()) {
+            _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap));
         }
     }
 
-    _is_current_empty = (_cur_size == 0);
     return Status::OK();
 }
 
@@ -112,13 +99,4 @@ Status VExplodeBitmapTableFunction::process_close() {
     return Status::OK();
 }
 
-Status VExplodeBitmapTableFunction::get_value_length(int64_t* length) {
-    if (_is_current_empty) {
-        *length = -1;
-    } else {
-        *length = sizeof(uint64_t);
-    }
-    return Status::OK();
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode_bitmap.h 
b/be/src/vec/exprs/table_function/vexplode_bitmap.h
index 3d4b7c2ff0..dd47261c38 100644
--- a/be/src/vec/exprs/table_function/vexplode_bitmap.h
+++ b/be/src/vec/exprs/table_function/vexplode_bitmap.h
@@ -29,13 +29,12 @@ public:
     ~VExplodeBitmapTableFunction() override = default;
 
     Status reset() override;
-    Status get_value(void** output) override;
-    Status forward(bool* eos) override;
+    void get_value(MutableColumnPtr& column) override;
+    Status forward(int step = 1) override;
 
-    Status process_init(vectorized::Block* block) override;
+    Status process_init(Block* block) override;
     Status process_row(size_t row_idx) override;
     Status process_close() override;
-    Status get_value_length(int64_t* length) override;
 
 private:
     void _reset_iterator();
@@ -43,9 +42,6 @@ private:
     const BitmapValue* _cur_bitmap = nullptr;
     // iterator of _cur_bitmap
     std::unique_ptr<BitmapValueIterator> _cur_iter = nullptr;
-    // current value read from bitmap, it will be referenced by
-    // table function scan node.
-    uint64_t _cur_value = 0;
     ColumnPtr _value_column;
 };
 
diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.cpp 
b/be/src/vec/exprs/table_function/vexplode_json_array.cpp
index f12fa617bc..b67467ea98 100644
--- a/be/src/vec/exprs/table_function/vexplode_json_array.cpp
+++ b/be/src/vec/exprs/table_function/vexplode_json_array.cpp
@@ -127,7 +127,7 @@ 
VExplodeJsonArrayTableFunction::VExplodeJsonArrayTableFunction(ExplodeJsonArrayT
     _fn_name = "vexplode_json_array";
 }
 
-Status VExplodeJsonArrayTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeJsonArrayTableFunction::process_init(Block* block) {
     CHECK(_vexpr_context->root()->children().size() == 1)
             << _vexpr_context->root()->children().size();
 
@@ -139,28 +139,15 @@ Status 
VExplodeJsonArrayTableFunction::process_init(vectorized::Block* block) {
     return Status::OK();
 }
 
-Status VExplodeJsonArrayTableFunction::reset() {
-    _eos = false;
-    _cur_offset = 0;
-    return Status::OK();
-}
-
 Status VExplodeJsonArrayTableFunction::process_row(size_t row_idx) {
-    _is_current_empty = false;
-    _eos = false;
+    RETURN_IF_ERROR(TableFunction::process_row(row_idx));
 
     StringRef text = _text_column->get_data_at(row_idx);
-    if (text.data == nullptr) {
-        _is_current_empty = true;
-    } else {
+    if (text.data != nullptr) {
         rapidjson::Document document;
         document.Parse(text.data, text.size);
-        if (UNLIKELY(document.HasParseError()) || !document.IsArray() ||
-            document.GetArray().Size() == 0) {
-            _is_current_empty = true;
-        } else {
+        if (!document.HasParseError() && document.IsArray() && 
document.GetArray().Size()) {
             _cur_size = _parsed_data.set_output(_type, document);
-            _cur_offset = 0;
         }
     }
     return Status::OK();
@@ -171,22 +158,13 @@ Status VExplodeJsonArrayTableFunction::process_close() {
     return Status::OK();
 }
 
-Status VExplodeJsonArrayTableFunction::get_value_length(int64_t* length) {
-    if (_is_current_empty) {
-        *length = -1;
-    } else {
-        _parsed_data.get_value_length(_type, _cur_offset, length);
-    }
-    return Status::OK();
-}
-
-Status VExplodeJsonArrayTableFunction::get_value(void** output) {
-    if (_is_current_empty) {
-        *output = nullptr;
+void VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column) {
+    if (current_empty()) {
+        column->insert_default();
     } else {
-        _parsed_data.get_value(_type, _cur_offset, output, true);
+        column->insert_data((char*)_parsed_data.get_value(_type, _cur_offset, 
true),
+                            _parsed_data.get_value_length(_type, _cur_offset));
     }
-    return Status::OK();
 }
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.h 
b/be/src/vec/exprs/table_function/vexplode_json_array.h
index ab171c7cef..9d7de170ff 100644
--- a/be/src/vec/exprs/table_function/vexplode_json_array.h
+++ b/be/src/vec/exprs/table_function/vexplode_json_array.h
@@ -63,33 +63,25 @@ struct ParsedData {
         }
     }
 
-    void get_value(ExplodeJsonArrayType type, int64_t offset, void** output, 
bool real = false) {
+    void* get_value(ExplodeJsonArrayType type, int64_t offset, bool real = 
false) {
         switch (type) {
         case ExplodeJsonArrayType::INT:
         case ExplodeJsonArrayType::DOUBLE:
-            *output = _data[offset];
-            break;
+            return _data[offset];
         case ExplodeJsonArrayType::STRING:
-            *output = _string_nulls[offset] ? nullptr
-                      : real                ? 
reinterpret_cast<void*>(_backup_string[offset].data())
-                                            : &_data_string[offset];
-            break;
+            return _string_nulls[offset] ? nullptr
+                   : real                ? 
reinterpret_cast<void*>(_backup_string[offset].data())
+                                         : &_data_string[offset];
         default:
-            CHECK(false) << type;
+            return nullptr;
         }
     }
 
-    void get_value_length(ExplodeJsonArrayType type, int64_t offset, int64_t* 
length) {
-        switch (type) {
-        case ExplodeJsonArrayType::INT:
-        case ExplodeJsonArrayType::DOUBLE:
-            break;
-        case ExplodeJsonArrayType::STRING:
-            *length = _string_nulls[offset] ? -1 : 
_backup_string[offset].size();
-            break;
-        default:
-            CHECK(false) << type;
+    int64 get_value_length(ExplodeJsonArrayType type, int64_t offset) {
+        if (type == ExplodeJsonArrayType::STRING && !_string_nulls[offset]) {
+            return _backup_string[offset].size();
         }
+        return 0;
     }
 
     int set_output(ExplodeJsonArrayType type, rapidjson::Document& document);
@@ -100,13 +92,10 @@ public:
     VExplodeJsonArrayTableFunction(ExplodeJsonArrayType type);
     ~VExplodeJsonArrayTableFunction() override = default;
 
-    Status process_init(vectorized::Block* block) override;
+    Status process_init(Block* block) override;
     Status process_row(size_t row_idx) override;
     Status process_close() override;
-    Status get_value(void** output) override;
-    Status get_value_length(int64_t* length) override;
-
-    Status reset() override;
+    void get_value(MutableColumnPtr& column) override;
 
 private:
     ParsedData _parsed_data;
diff --git a/be/src/vec/exprs/table_function/vexplode_numbers.cpp 
b/be/src/vec/exprs/table_function/vexplode_numbers.cpp
index 1c01653ed2..d1a5eb07dc 100644
--- a/be/src/vec/exprs/table_function/vexplode_numbers.cpp
+++ b/be/src/vec/exprs/table_function/vexplode_numbers.cpp
@@ -18,6 +18,9 @@
 #include "vec/exprs/table_function/vexplode_numbers.h"
 
 #include "common/status.h"
+#include "vec/columns/column_const.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/columns_number.h"
 #include "vec/exprs/vexpr.h"
 
 namespace doris::vectorized {
@@ -26,7 +29,7 @@ VExplodeNumbersTableFunction::VExplodeNumbersTableFunction() {
     _fn_name = "vexplode_numbers";
 }
 
-Status VExplodeNumbersTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeNumbersTableFunction::process_init(Block* block) {
     CHECK(_vexpr_context->root()->children().size() == 1)
             << "VExplodeSplitTableFunction must be have 1 children but have "
             << _vexpr_context->root()->children().size();
@@ -35,24 +38,38 @@ Status 
VExplodeNumbersTableFunction::process_init(vectorized::Block* block) {
     
RETURN_IF_ERROR(_vexpr_context->root()->children()[0]->execute(_vexpr_context, 
block,
                                                                    
&value_column_idx));
     _value_column = block->get_by_position(value_column_idx).column;
+    if (is_column_const(*_value_column)) {
+        _cur_size = 0;
+        auto& column_nested = assert_cast<const 
ColumnConst&>(*_value_column).get_data_column_ptr();
+        if (column_nested->is_nullable()) {
+            if (!column_nested->is_null_at(0)) {
+                _cur_size = static_cast<const 
ColumnNullable*>(column_nested.get())
+                                    ->get_nested_column()
+                                    .get_int(0);
+            }
+        } else {
+            _cur_size = column_nested->get_int(0);
+        }
 
+        if (_cur_size && _cur_size <= block->rows()) { // avoid 
elements_column too big or empty
+            _is_const = true;                          // use const optimize
+            for (int i = 0; i < _cur_size; i++) {
+                ((ColumnInt32*)_elements_column.get())->insert_value(i);
+            }
+        }
+    }
     return Status::OK();
 }
 
 Status VExplodeNumbersTableFunction::process_row(size_t row_idx) {
-    _is_current_empty = false;
-    _eos = false;
+    RETURN_IF_ERROR(TableFunction::process_row(row_idx));
+    if (_is_const) {
+        return Status::OK();
+    }
 
     StringRef value = _value_column->get_data_at(row_idx);
-
-    if (value.data == nullptr) {
-        _is_current_empty = true;
-        _cur_size = 0;
-        _cur_offset = 0;
-    } else {
-        _cur_size = *reinterpret_cast<const int*>(value.data);
-        _cur_offset = 0;
-        _is_current_empty = (_cur_size <= 0);
+    if (value.data != nullptr) {
+        _cur_size = std::max(0, *reinterpret_cast<const int*>(value.data));
     }
     return Status::OK();
 }
@@ -62,28 +79,21 @@ Status VExplodeNumbersTableFunction::process_close() {
     return Status::OK();
 }
 
-Status VExplodeNumbersTableFunction::reset() {
-    _eos = false;
-    _cur_offset = 0;
-    return Status::OK();
-}
-
-Status VExplodeNumbersTableFunction::get_value(void** output) {
-    if (_is_current_empty) {
-        *output = nullptr;
+void VExplodeNumbersTableFunction::get_value(MutableColumnPtr& column) {
+    if (current_empty()) {
+        column->insert_default();
     } else {
-        *output = &_cur_offset;
+        if (_is_nullable) {
+            static_cast<ColumnInt32*>(
+                    
static_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
+                    ->insert_value(_cur_offset);
+            static_cast<ColumnUInt8*>(
+                    
static_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+                    ->insert_default();
+        } else {
+            static_cast<ColumnInt32*>(column.get())->insert_value(_cur_offset);
+        }
     }
-    return Status::OK();
-}
-
-Status VExplodeNumbersTableFunction::get_value_length(int64_t* length) {
-    if (_is_current_empty) {
-        *length = -1;
-    } else {
-        *length = sizeof(int);
-    }
-    return Status::OK();
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode_numbers.h 
b/be/src/vec/exprs/table_function/vexplode_numbers.h
index e125e5d5b8..3e93471b47 100644
--- a/be/src/vec/exprs/table_function/vexplode_numbers.h
+++ b/be/src/vec/exprs/table_function/vexplode_numbers.h
@@ -25,17 +25,37 @@ namespace doris::vectorized {
 class VExplodeNumbersTableFunction : public TableFunction {
 public:
     VExplodeNumbersTableFunction();
-    virtual ~VExplodeNumbersTableFunction() = default;
+    ~VExplodeNumbersTableFunction() override = default;
 
-    virtual Status process_init(vectorized::Block* block) override;
-    virtual Status process_row(size_t row_idx) override;
-    virtual Status process_close() override;
-    virtual Status reset() override;
-    virtual Status get_value(void** output) override;
-    virtual Status get_value_length(int64_t* length) override;
+    Status process_init(Block* block) override;
+    Status process_row(size_t row_idx) override;
+    Status process_close() override;
+    void get_value(MutableColumnPtr& column) override;
+    int get_value(MutableColumnPtr& column, int max_step) override {
+        if (_is_const) {
+            max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
+            if (_is_nullable) {
+                static_cast<ColumnInt32*>(
+                        
static_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
+                        ->insert_many_from(*_elements_column, _cur_offset, 
max_step);
+                static_cast<ColumnUInt8*>(
+                        
static_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+                        ->insert_many_defaults(max_step);
+            } else {
+                static_cast<ColumnInt32*>(column.get())
+                        ->insert_many_from(*_elements_column, _cur_offset, 
max_step);
+            }
+
+            forward(max_step);
+            return max_step;
+        }
+
+        return TableFunction::get_value(column, max_step);
+    }
 
 private:
     ColumnPtr _value_column;
+    ColumnPtr _elements_column = ColumnInt32::create();
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode_split.cpp 
b/be/src/vec/exprs/table_function/vexplode_split.cpp
index 1bceffeeba..d2438d8c9a 100644
--- a/be/src/vec/exprs/table_function/vexplode_split.cpp
+++ b/be/src/vec/exprs/table_function/vexplode_split.cpp
@@ -32,15 +32,7 @@ Status VExplodeSplitTableFunction::open() {
     return Status::OK();
 }
 
-Status VExplodeSplitTableFunction::reset() {
-    _eos = false;
-    if (!_is_current_empty) {
-        _cur_offset = 0;
-    }
-    return Status::OK();
-}
-
-Status VExplodeSplitTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeSplitTableFunction::process_init(Block* block) {
     CHECK(_vexpr_context->root()->children().size() == 2)
             << "VExplodeSplitTableFunction must be have 2 children but have "
             << _vexpr_context->root()->children().size();
@@ -77,14 +69,9 @@ Status 
VExplodeSplitTableFunction::process_init(vectorized::Block* block) {
 }
 
 Status VExplodeSplitTableFunction::process_row(size_t row_idx) {
-    _is_current_empty = false;
-    _eos = false;
+    RETURN_IF_ERROR(TableFunction::process_row(row_idx));
 
-    if ((_test_null_map and _test_null_map[row_idx]) || _delimiter.data == 
nullptr) {
-        _is_current_empty = true;
-        _cur_size = 0;
-        _cur_offset = 0;
-    } else {
+    if (!(_test_null_map && _test_null_map[row_idx]) && _delimiter.data != 
nullptr) {
         // TODO: use the function to be better string_view/StringRef split
         auto split = [](std::string_view strv, std::string_view delims = " ") {
             std::vector<std::string_view> output;
@@ -113,8 +100,6 @@ Status VExplodeSplitTableFunction::process_row(size_t 
row_idx) {
         _backup = split(_real_text_column->get_data_at(row_idx), _delimiter);
 
         _cur_size = _backup.size();
-        _cur_offset = 0;
-        _is_current_empty = (_cur_size == 0);
     }
     return Status::OK();
 }
@@ -127,22 +112,13 @@ Status VExplodeSplitTableFunction::process_close() {
     return Status::OK();
 }
 
-Status VExplodeSplitTableFunction::get_value(void** output) {
-    if (_is_current_empty) {
-        *output = nullptr;
+void VExplodeSplitTableFunction::get_value(MutableColumnPtr& column) {
+    if (current_empty()) {
+        column->insert_default();
     } else {
-        *output = const_cast<char*>(_backup[_cur_offset].data());
+        column->insert_data(const_cast<char*>(_backup[_cur_offset].data()),
+                            _backup[_cur_offset].length());
     }
-    return Status::OK();
-}
-
-Status VExplodeSplitTableFunction::get_value_length(int64_t* length) {
-    if (_is_current_empty) {
-        *length = -1;
-    } else {
-        *length = _backup[_cur_offset].length();
-    }
-    return Status::OK();
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode_split.h 
b/be/src/vec/exprs/table_function/vexplode_split.h
index 53935b6a0b..5881ef9b6e 100644
--- a/be/src/vec/exprs/table_function/vexplode_split.h
+++ b/be/src/vec/exprs/table_function/vexplode_split.h
@@ -30,12 +30,10 @@ public:
     ~VExplodeSplitTableFunction() override = default;
 
     Status open() override;
-    Status process_init(vectorized::Block* block) override;
+    Status process_init(Block* block) override;
     Status process_row(size_t row_idx) override;
     Status process_close() override;
-    Status get_value(void** output) override;
-    Status get_value_length(int64_t* length) override;
-    Status reset() override;
+    void get_value(MutableColumnPtr& column) override;
 
 private:
     std::vector<std::string_view> _backup;
diff --git a/be/test/vec/function/function_test_util.cpp 
b/be/test/vec/function/function_test_util.cpp
index e1551fe77f..da975a2c84 100644
--- a/be/test/vec/function/function_test_util.cpp
+++ b/be/test/vec/function/function_test_util.cpp
@@ -335,6 +335,9 @@ Block* process_table_function(TableFunction* fn, Block* 
input_block,
 
     // prepare output column
     vectorized::MutableColumnPtr column = descs[0].data_type->create_column();
+    if (column->is_nullable()) {
+        fn->set_nullable();
+    }
 
     // process table function for all rows
     for (size_t row = 0; row < input_block->rows(); ++row) {
@@ -348,25 +351,10 @@ Block* process_table_function(TableFunction* fn, Block* 
input_block,
             continue;
         }
 
-        bool tmp_eos = false;
         do {
-            void* cell = nullptr;
-            int64_t cell_len = 0;
-            if (fn->get_value(&cell) != Status::OK() ||
-                fn->get_value_length(&cell_len) != Status::OK()) {
-                LOG(WARNING) << "TableFunction get_value or get_value_length 
failed";
-                return nullptr;
-            }
-
-            // copy data from input block
-            if (cell == nullptr) {
-                column->insert_default();
-            } else {
-                column->insert_data(reinterpret_cast<char*>(cell), cell_len);
-            }
-
-            fn->forward(&tmp_eos);
-        } while (!tmp_eos);
+            fn->get_value(column);
+            fn->forward();
+        } while (!fn->eos());
     }
 
     std::unique_ptr<Block> output_block(new Block());
diff --git 
a/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy 
b/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy
index 2b2b0619fd..a3a66c68fd 100644
--- a/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy
+++ b/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy
@@ -111,7 +111,7 @@ suite ("test_varchar_schema_change") {
         logger.info(res[2][1])
         assertEquals(res[2][1].toLowerCase(),"varchar(30)")
 
-        qt_sc " select * from ${tableName} order by 1; "
+        qt_sc " select * from ${tableName} order by 1,2; "
 
         // test { //没捕获到异常
         //     sql """ insert into ${tableName} 
values(92,'2017-12-01',483647,'sdafdsaf') """
@@ -140,7 +140,7 @@ suite ("test_varchar_schema_change") {
         logger.info(res[2][1])
         assertEquals(res[2][1].toLowerCase(),"varchar(30)")
 
-        qt_sc " select * from ${tableName} where c2 like '%1%' order by 1; "
+        qt_sc " select * from ${tableName} where c2 like '%1%' order by 1,2; "
 
         sql """ insert into ${tableName} 
values(22,'2011-12-01','12f2','fdsaf') """
         sql """ insert into ${tableName} 
values(55,'2009-11-21','12d1d113','123aa') """
@@ -196,9 +196,9 @@ suite ("test_varchar_schema_change") {
                 } while (running)
         }
 
-        qt_sc " select * from ${tableName} order by 1; "
-        qt_sc " select min(c2),max(c2) from ${tableName} order by 1; "
-        qt_sc " select min(c2),max(c2) from ${tableName} group by c0 order by 
1; "
+        qt_sc " select * from ${tableName} order by 1,2; "
+        qt_sc " select min(c2),max(c2) from ${tableName} order by 1,2; "
+        qt_sc " select min(c2),max(c2) from ${tableName} group by c0 order by 
1,2; "
 
         sleep(5000)
         sql """ alter table ${tableName} 
@@ -222,7 +222,7 @@ suite ("test_varchar_schema_change") {
         logger.info(res[2][1])
         assertEquals(res[2][1].toLowerCase(),"varchar(40)")
 
-        qt_sc " select * from ${tableName} order by 1; "
+        qt_sc " select * from ${tableName} order by 1,2; "
 
         // test{
         //     sql """ alter table t0 modify column c1 varchar(20) NOT NULL """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to