zhangstar333 commented on code in PR #44365:
URL: https://github.com/apache/doris/pull/44365#discussion_r1870542919


##########
be/src/vec/exprs/lambda_function/varray_map_function.cpp:
##########
@@ -42,6 +46,17 @@ namespace doris::vectorized {
 
 class VExprContext;
 
+struct LambdaArgs {
+    std::vector<int> output_slot_ref_indexs;
+    int64_t current_row_idx = 0;
+    int64_t current_offset_in_array = 0;
+    size_t array_start = 0;
+    int64_t cur_size = 0;
+    const ColumnArray::Offsets64* offsets_ptr = nullptr;
+    int current_repeat_times = 0;
+    bool eos = false;

Review Comment:
   maybe could change it's name, seems this means current_row eos? 



##########
be/src/vec/exprs/lambda_function/varray_map_function.cpp:
##########
@@ -42,6 +46,17 @@ namespace doris::vectorized {
 
 class VExprContext;
 
+struct LambdaArgs {

Review Comment:
   maybe could add some comment of each variables



##########
be/src/vec/exprs/lambda_function/varray_map_function.cpp:
##########
@@ -113,75 +151,172 @@ class ArrayMapFunction : public LambdaFunction {
                 const auto& off_data = assert_cast<const 
ColumnArray::ColumnOffsets&>(
                         col_array.get_offsets_column());
                 array_column_offset = 
off_data.clone_resized(col_array.get_offsets_column().size());
+                args.offsets_ptr = &col_array.get_offsets();
             } else {
                 // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from 
array_test2;
                 // c_array1: [0,1,2,3,4,5,6,7,8,9]
                 const auto& array_offsets =
                         assert_cast<const 
ColumnArray::ColumnOffsets&>(*first_array_offsets)
-                                .get_data();
+                        .get_data();
                 if (nested_array_column_rows != 
col_array.get_data_ptr()->size() ||
                     (!array_offsets.empty() &&
                      memcmp(array_offsets.data(), 
col_array.get_offsets().data(),
                             sizeof(array_offsets[0]) * array_offsets.size()) 
!= 0)) {
                     return Status::InvalidArgument(
-                            "in array map function, the input column size "
-                            "are "
-                            "not equal completely, nested column data rows 1st 
size is {}, {}th "
-                            "size is {}.",
-                            nested_array_column_rows, i + 1, 
col_array.get_data_ptr()->size());
+                        "in array map function, the input column size "
+                        "are "
+                        "not equal completely, nested column data rows 1st 
size is {}, {}th "
+                        "size is {}.",
+                        nested_array_column_rows, i + 1, 
col_array.get_data_ptr()->size());
                 }
             }
-
-            // insert the data column to the new block
-            ColumnWithTypeAndName data_column {col_array.get_data_ptr(), 
col_type.get_nested_type(),
-                                               "R" + 
array_column_type_name.name};
-            lambda_block.insert(std::move(data_column));
+            lambda_datas[i] = col_array.get_data_ptr();
+            names.push_back("R" + array_column_type_name.name);
+            data_types.push_back(col_type.get_nested_type());
         }
 
-        //3. child[0]->execute(new_block)
-        RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, 
result_column_id));
+        ColumnPtr result_col = nullptr;
+        DataTypePtr res_type;
+        std::string res_name;
+
+        //process first row
+        args.array_start = (*args.offsets_ptr)[args.current_row_idx - 1];
+        args.cur_size = (*args.offsets_ptr)[args.current_row_idx] - 
args.array_start;
+
+        while (args.current_row_idx < block->rows()) {
+            Block lambda_block;
+            for (int i = 0; i < names.size(); i++) {
+                ColumnWithTypeAndName data_column {data_types[i], names[i]};
+                lambda_block.insert(std::move(data_column));
+            }
+
+            MutableBlock m_lambda_block(&lambda_block);
+            MutableColumns& columns = m_lambda_block.mutable_columns();
+            while (columns[gap]->size() < batch_size) {
+                long max_step = batch_size - columns[gap]->size();
+                long current_step = std::min(max_step, (long)(args.cur_size - 
args.current_offset_in_array));
+                size_t pos = args.array_start + args.current_offset_in_array;
+                for (int i = 0; i < arguments.size(); ++i) {
+                    columns[gap + i]->insert_range_from(*lambda_datas[i], pos, 
current_step);
+                }
+                args.current_offset_in_array += current_step;
+                args.current_repeat_times += current_step;
+                if (args.current_offset_in_array >= args.cur_size) {
+                    args.eos = true;
+                } else {
+                    _extend_data(columns, block, args, gap);

Review Comment:
   the _extend_data seems call by args.eos = true && args.eos = false
   why not move it outside



##########
be/src/vec/exprs/lambda_function/varray_map_function.cpp:
##########
@@ -113,75 +151,172 @@ class ArrayMapFunction : public LambdaFunction {
                 const auto& off_data = assert_cast<const 
ColumnArray::ColumnOffsets&>(
                         col_array.get_offsets_column());
                 array_column_offset = 
off_data.clone_resized(col_array.get_offsets_column().size());
+                args.offsets_ptr = &col_array.get_offsets();
             } else {
                 // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from 
array_test2;
                 // c_array1: [0,1,2,3,4,5,6,7,8,9]
                 const auto& array_offsets =
                         assert_cast<const 
ColumnArray::ColumnOffsets&>(*first_array_offsets)
-                                .get_data();
+                        .get_data();
                 if (nested_array_column_rows != 
col_array.get_data_ptr()->size() ||
                     (!array_offsets.empty() &&
                      memcmp(array_offsets.data(), 
col_array.get_offsets().data(),
                             sizeof(array_offsets[0]) * array_offsets.size()) 
!= 0)) {
                     return Status::InvalidArgument(
-                            "in array map function, the input column size "
-                            "are "
-                            "not equal completely, nested column data rows 1st 
size is {}, {}th "
-                            "size is {}.",
-                            nested_array_column_rows, i + 1, 
col_array.get_data_ptr()->size());
+                        "in array map function, the input column size "
+                        "are "
+                        "not equal completely, nested column data rows 1st 
size is {}, {}th "
+                        "size is {}.",
+                        nested_array_column_rows, i + 1, 
col_array.get_data_ptr()->size());
                 }
             }
-
-            // insert the data column to the new block
-            ColumnWithTypeAndName data_column {col_array.get_data_ptr(), 
col_type.get_nested_type(),
-                                               "R" + 
array_column_type_name.name};
-            lambda_block.insert(std::move(data_column));
+            lambda_datas[i] = col_array.get_data_ptr();
+            names.push_back("R" + array_column_type_name.name);
+            data_types.push_back(col_type.get_nested_type());
         }
 
-        //3. child[0]->execute(new_block)
-        RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, 
result_column_id));
+        ColumnPtr result_col = nullptr;
+        DataTypePtr res_type;
+        std::string res_name;
+
+        //process first row
+        args.array_start = (*args.offsets_ptr)[args.current_row_idx - 1];
+        args.cur_size = (*args.offsets_ptr)[args.current_row_idx] - 
args.array_start;
+
+        while (args.current_row_idx < block->rows()) {
+            Block lambda_block;
+            for (int i = 0; i < names.size(); i++) {
+                ColumnWithTypeAndName data_column {data_types[i], names[i]};
+                lambda_block.insert(std::move(data_column));
+            }
+
+            MutableBlock m_lambda_block(&lambda_block);
+            MutableColumns& columns = m_lambda_block.mutable_columns();
+            while (columns[gap]->size() < batch_size) {
+                long max_step = batch_size - columns[gap]->size();
+                long current_step = std::min(max_step, (long)(args.cur_size - 
args.current_offset_in_array));
+                size_t pos = args.array_start + args.current_offset_in_array;
+                for (int i = 0; i < arguments.size(); ++i) {
+                    columns[gap + i]->insert_range_from(*lambda_datas[i], pos, 
current_step);
+                }
+                args.current_offset_in_array += current_step;
+                args.current_repeat_times += current_step;
+                if (args.current_offset_in_array >= args.cur_size) {
+                    args.eos = true;
+                } else {
+                    _extend_data(columns, block, args, gap);
+                }
+                if (args.eos) {
+                    _extend_data(columns, block, args, gap);
+                    args.current_row_idx++;
+                    args.current_offset_in_array = 0;
+                    if (args.current_row_idx >= block->rows()) {
+                        break;
+                    }
+                    args.eos = false;
+                    args.array_start = 
(*args.offsets_ptr)[args.current_row_idx - 1];
+                    args.cur_size = (*args.offsets_ptr)[args.current_row_idx] 
- args.array_start;
+                }
+            }
+
+            lambda_block.set_columns(std::move(columns));
 
-        auto res_col = lambda_block.get_by_position(*result_column_id)
-                               .column->convert_to_full_column_if_const();
-        auto res_type = lambda_block.get_by_position(*result_column_id).type;
-        auto res_name = lambda_block.get_by_position(*result_column_id).name;
+            //3. child[0]->execute(new_block)
+            RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, 
result_column_id));
+
+            auto res_col = lambda_block.get_by_position(*result_column_id)
+               .column->convert_to_full_column_if_const();
+            res_type = lambda_block.get_by_position(*result_column_id).type;
+            res_name = lambda_block.get_by_position(*result_column_id).name;
+            if (!result_col) {
+                result_col = std::move(res_col);
+            } else {
+                MutableColumnPtr column = (*std::move(result_col)).mutate();
+                column->insert_range_from(*res_col, 0, res_col->size());
+            }
+        }
 
         //4. get the result column after execution, reassemble it into a new 
array column, and return.
         ColumnWithTypeAndName result_arr;
         if (result_type->is_nullable()) {
             if (res_type->is_nullable()) {
                 result_arr = {ColumnNullable::create(
-                                      ColumnArray::create(res_col, 
std::move(array_column_offset)),
-                                      std::move(outside_null_map)),
-                              result_type, res_name};
+                        ColumnArray::create(result_col, 
std::move(array_column_offset)),
+                        std::move(outside_null_map)),
+                    result_type, res_name};
             } else {
                 // deal with eg: select array_map(x -> x is null, [null, 1, 
2]);
                 // need to create the nested column null map for column array
-                auto nested_null_map = ColumnUInt8::create(res_col->size(), 0);
+                auto nested_null_map = ColumnUInt8::create(result_col->size(), 
0);
                 result_arr = {
-                        ColumnNullable::create(
-                                ColumnArray::create(
-                                        ColumnNullable::create(res_col, 
std::move(nested_null_map)),
-                                        std::move(array_column_offset)),
-                                std::move(outside_null_map)),
-                        result_type, res_name};
+                    ColumnNullable::create(
+                        ColumnArray::create(
+                            ColumnNullable::create(result_col, 
std::move(nested_null_map)),
+                            std::move(array_column_offset)),
+                        std::move(outside_null_map)),
+                    result_type, res_name};
             }
         } else {
             if (res_type->is_nullable()) {
-                result_arr = {ColumnArray::create(res_col, 
std::move(array_column_offset)),
-                              result_type, res_name};
+                result_arr = {ColumnArray::create(result_col, 
std::move(array_column_offset)),
+                    result_type, res_name};
             } else {
-                auto nested_null_map = ColumnUInt8::create(res_col->size(), 0);
+                auto nested_null_map = ColumnUInt8::create(result_col->size(), 
0);
                 result_arr = {ColumnArray::create(
-                                      ColumnNullable::create(res_col, 
std::move(nested_null_map)),
-                                      std::move(array_column_offset)),
-                              result_type, res_name};
+                        ColumnNullable::create(result_col, 
std::move(nested_null_map)),
+                        std::move(array_column_offset)),
+                    result_type, res_name};
             }
         }
         block->insert(std::move(result_arr));
         *result_column_id = block->columns() - 1;
+
         return Status::OK();
     }
+
+private:
+
+    bool _contains_column_id(LambdaArgs& args, int id) {
+        const auto it = std::find(args.output_slot_ref_indexs.begin(), 
args.output_slot_ref_indexs.end(), id);
+        return it != args.output_slot_ref_indexs.end();
+    }
+
+    void _set_column_ref_column_id(VExprSPtr expr, int gap) {
+        for (const auto& child : expr->children()) {
+            if (child->is_column_ref()) {
+                auto* ref = static_cast<VColumnRef*>(child.get());
+                ref->set_gap(gap);
+            } else {
+                _set_column_ref_column_id(child, gap);
+            }
+        }
+    }
+
+    void _collect_slot_ref_column_id(VExprSPtr expr, LambdaArgs& args) {
+        for (const auto& child : expr->children()) {
+            if (child->is_slot_ref()) {
+                const auto* ref = static_cast<VSlotRef*>(child.get());
+                args.output_slot_ref_indexs.push_back(ref->column_id());
+            } else {
+                _collect_slot_ref_column_id(child, args);
+            }
+        }
+    }
+
+    void _extend_data(std::vector<MutableColumnPtr>& columns, Block* block, 
LambdaArgs& args,int size) {
+        if (!args.current_repeat_times || !size) {
+            return;
+        }
+        for (int i = 0; i < size; i++) {
+            if (_contains_column_id(args, i)) {
+                auto src_column = 
block->get_by_position(i).column->convert_to_full_column_if_const();
+                columns[i]->insert_many_from(*src_column, 
args.current_row_idx, args.current_repeat_times);
+            } else {
+                columns[i]->insert_many_defaults(args.current_repeat_times);

Review Comment:
   this is "temp" column ? seems no need insert data ?



##########
be/src/vec/exprs/lambda_function/varray_map_function.cpp:
##########
@@ -113,75 +151,172 @@ class ArrayMapFunction : public LambdaFunction {
                 const auto& off_data = assert_cast<const 
ColumnArray::ColumnOffsets&>(
                         col_array.get_offsets_column());
                 array_column_offset = 
off_data.clone_resized(col_array.get_offsets_column().size());
+                args.offsets_ptr = &col_array.get_offsets();
             } else {
                 // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from 
array_test2;
                 // c_array1: [0,1,2,3,4,5,6,7,8,9]
                 const auto& array_offsets =
                         assert_cast<const 
ColumnArray::ColumnOffsets&>(*first_array_offsets)
-                                .get_data();
+                        .get_data();
                 if (nested_array_column_rows != 
col_array.get_data_ptr()->size() ||
                     (!array_offsets.empty() &&
                      memcmp(array_offsets.data(), 
col_array.get_offsets().data(),
                             sizeof(array_offsets[0]) * array_offsets.size()) 
!= 0)) {
                     return Status::InvalidArgument(
-                            "in array map function, the input column size "
-                            "are "
-                            "not equal completely, nested column data rows 1st 
size is {}, {}th "
-                            "size is {}.",
-                            nested_array_column_rows, i + 1, 
col_array.get_data_ptr()->size());
+                        "in array map function, the input column size "
+                        "are "
+                        "not equal completely, nested column data rows 1st 
size is {}, {}th "
+                        "size is {}.",
+                        nested_array_column_rows, i + 1, 
col_array.get_data_ptr()->size());
                 }
             }
-
-            // insert the data column to the new block
-            ColumnWithTypeAndName data_column {col_array.get_data_ptr(), 
col_type.get_nested_type(),
-                                               "R" + 
array_column_type_name.name};
-            lambda_block.insert(std::move(data_column));
+            lambda_datas[i] = col_array.get_data_ptr();
+            names.push_back("R" + array_column_type_name.name);
+            data_types.push_back(col_type.get_nested_type());
         }
 
-        //3. child[0]->execute(new_block)
-        RETURN_IF_ERROR(children[0]->execute(context, &lambda_block, 
result_column_id));
+        ColumnPtr result_col = nullptr;
+        DataTypePtr res_type;
+        std::string res_name;
+
+        //process first row
+        args.array_start = (*args.offsets_ptr)[args.current_row_idx - 1];
+        args.cur_size = (*args.offsets_ptr)[args.current_row_idx] - 
args.array_start;
+
+        while (args.current_row_idx < block->rows()) {
+            Block lambda_block;
+            for (int i = 0; i < names.size(); i++) {
+                ColumnWithTypeAndName data_column {data_types[i], names[i]};
+                lambda_block.insert(std::move(data_column));
+            }
+
+            MutableBlock m_lambda_block(&lambda_block);

Review Comment:
   seems could use this directly MutableColumns Block::mutate_columns()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to