morningman commented on code in PR #12793:
URL: https://github.com/apache/doris/pull/12793#discussion_r975524151
##########
be/src/vec/exec/scan/new_file_scan_node.h:
##########
@@ -25,6 +25,8 @@ class NewFileScanNode : public VScanNode {
public:
NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
Review Comment:
Why override `init`?
##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -69,24 +81,84 @@ Status VFileScanner::open(RuntimeState* state) {
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
eof) {
if (_cur_reader == nullptr || _cur_reader_eof) {
- _get_next_reader();
+ RETURN_IF_ERROR(_get_next_reader());
}
if (!_scanner_eof) {
- _cur_reader->get_next_block(block, &_cur_reader_eof);
+ RETURN_IF_ERROR(_init_src_block(block));
Review Comment:
Add comment to describe all these steps.
Better give an example to explain.
##########
be/src/vec/exec/format/generic_reader.h:
##########
@@ -28,6 +28,17 @@ class Block;
class GenericReader {
public:
virtual Status get_next_block(Block* block, bool* eof) = 0;
+ virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type()
{
+ std::unordered_map<std::string, TypeDescriptor> map;
+ return map;
+ }
+ virtual Status init_reader(const TupleDescriptor* tuple_desc,
Review Comment:
init_reader may not suitable to be an interface method.
##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -112,8 +184,127 @@ Status
VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t r
return Status::OK();
}
+Status VFileScanner::_convert_to_output_block(Block* block) {
+ if (_src_block_ptr == block) {
+ return Status::OK();
+ }
+
+ block->clear();
+
+ int ctx_idx = 0;
+ size_t rows = _src_block.rows();
+ auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
+ auto& filter_map = filter_column->get_data();
+ auto origin_column_num = _src_block.columns();
+
+ for (auto slot_desc : _output_tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+
+ int dest_index = ctx_idx++;
+
+ auto* ctx = _dest_vexpr_ctx[dest_index];
+ int result_column_id = -1;
+ // PT1 => dest primitive type
+ RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
+ bool is_origin_column = result_column_id < origin_column_num;
+ auto column_ptr =
+ is_origin_column && _src_block_mem_reuse
+ ?
_src_block.get_by_position(result_column_id).column->clone_resized(rows)
+ : _src_block.get_by_position(result_column_id).column;
+
+ DCHECK(column_ptr != nullptr);
+
+ // because of src_slot_desc is always be nullable, so the column_ptr
after do dest_expr
+ // is likely to be nullable
+ if (LIKELY(column_ptr->is_nullable())) {
+ auto nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ for (int i = 0; i < rows; ++i) {
+ if (filter_map[i] && nullable_column->is_null_at(i)) {
+ if (_strict_mode &&
(_src_slot_descs_order_by_dest[dest_index]) &&
+
!_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index])
+ .column->is_null_at(i)) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return _src_block.dump_one_line(i,
_num_of_columns_from_file);
+ },
+ [&]() -> std::string {
+ auto raw_value =
+
_src_block.get_by_position(ctx_idx).column->get_data_at(
+ i);
+ std::string raw_string =
raw_value.to_string();
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg,
+ "column({}) value is
incorrect while strict "
+ "mode is {}, "
+ "src value is {}",
+ slot_desc->col_name(),
_strict_mode, raw_string);
+ return fmt::to_string(error_msg);
+ },
+ &_scanner_eof));
+ filter_map[i] = false;
+ } else if (!slot_desc->is_nullable()) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return _src_block.dump_one_line(i,
_num_of_columns_from_file);
+ },
+ [&]() -> std::string {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg,
+ "column({}) values is null
while columns is not "
+ "nullable",
+ slot_desc->col_name());
+ return fmt::to_string(error_msg);
+ },
+ &_scanner_eof));
+ filter_map[i] = false;
+ }
+ }
+ }
+ if (!slot_desc->is_nullable()) {
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ } else if (slot_desc->is_nullable()) {
+ column_ptr = vectorized::make_nullable(column_ptr);
+ }
+ block->insert(dest_index,
vectorized::ColumnWithTypeAndName(std::move(column_ptr),
+
slot_desc->get_data_type_ptr(),
+
slot_desc->col_name()));
+ }
+
+ // after do the dest block insert operation, clear _src_block to remove
the reference of origin column
+ if (_src_block_mem_reuse) {
+ _src_block.clear_column_data(origin_column_num);
+ } else {
+ _src_block.clear();
+ }
+
+ size_t dest_size = block->columns();
+ // do filter
+ block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
+
std::make_shared<vectorized::DataTypeUInt8>(),
+ "filter column"));
+ RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size,
dest_size));
+ // _counter->num_rows_filtered += rows - dest_block->rows();
+
+ return Status::OK();
+}
+
+Status VFileScanner::_pre_filter_src_block() {
+ if (_pre_conjunct_ctx_ptr) {
+ auto origin_column_num = _src_block_ptr->columns();
+ // filter block
+ // auto old_rows = _src_block_ptr->rows();
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
+ _src_block_ptr,
origin_column_num));
+ // _counter->num_rows_unselected += old_rows - _src_block.rows();
Review Comment:
fix the `counter`
##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -69,24 +81,84 @@ Status VFileScanner::open(RuntimeState* state) {
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
eof) {
if (_cur_reader == nullptr || _cur_reader_eof) {
- _get_next_reader();
+ RETURN_IF_ERROR(_get_next_reader());
}
if (!_scanner_eof) {
- _cur_reader->get_next_block(block, &_cur_reader_eof);
+ RETURN_IF_ERROR(_init_src_block(block));
+ RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr,
&_cur_reader_eof));
+ RETURN_IF_ERROR(_cast_to_input_block(block));
}
- if (block->rows() > 0) {
- _fill_columns_from_path(block, block->rows());
- // TODO: cast to String for load job.
+ if (_scanner_eof && _src_block_ptr->rows() == 0) {
+ *eof = true;
}
- if (_scanner_eof && block->rows() == 0) {
- *eof = true;
+ if (_src_block_ptr->rows() > 0) {
Review Comment:
Add comments
##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -69,24 +81,84 @@ Status VFileScanner::open(RuntimeState* state) {
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
eof) {
if (_cur_reader == nullptr || _cur_reader_eof) {
- _get_next_reader();
+ RETURN_IF_ERROR(_get_next_reader());
}
if (!_scanner_eof) {
- _cur_reader->get_next_block(block, &_cur_reader_eof);
+ RETURN_IF_ERROR(_init_src_block(block));
+ RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr,
&_cur_reader_eof));
+ RETURN_IF_ERROR(_cast_to_input_block(block));
}
- if (block->rows() > 0) {
- _fill_columns_from_path(block, block->rows());
- // TODO: cast to String for load job.
+ if (_scanner_eof && _src_block_ptr->rows() == 0) {
+ *eof = true;
}
- if (_scanner_eof && block->rows() == 0) {
- *eof = true;
+ if (_src_block_ptr->rows() > 0) {
+ _fill_columns_from_path();
+ _pre_filter_src_block();
+ _convert_to_output_block(block);
+ }
+
+ return Status::OK();
+}
+
+Status VFileScanner::_init_src_block(Block* block) {
+ if (!_is_load) {
+ _src_block_ptr = block;
+ return Status::OK();
+ }
+
+ // size_t batch_pos = 0;
Review Comment:
Remove unused code.
##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -69,24 +81,84 @@ Status VFileScanner::open(RuntimeState* state) {
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
eof) {
if (_cur_reader == nullptr || _cur_reader_eof) {
- _get_next_reader();
+ RETURN_IF_ERROR(_get_next_reader());
}
if (!_scanner_eof) {
- _cur_reader->get_next_block(block, &_cur_reader_eof);
+ RETURN_IF_ERROR(_init_src_block(block));
+ RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr,
&_cur_reader_eof));
+ RETURN_IF_ERROR(_cast_to_input_block(block));
}
- if (block->rows() > 0) {
- _fill_columns_from_path(block, block->rows());
- // TODO: cast to String for load job.
+ if (_scanner_eof && _src_block_ptr->rows() == 0) {
+ *eof = true;
}
- if (_scanner_eof && block->rows() == 0) {
- *eof = true;
+ if (_src_block_ptr->rows() > 0) {
Review Comment:
wrap them with `RETURN_IF_ERROR`
##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -112,8 +184,127 @@ Status
VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t r
return Status::OK();
}
+Status VFileScanner::_convert_to_output_block(Block* block) {
+ if (_src_block_ptr == block) {
+ return Status::OK();
+ }
+
+ block->clear();
+
+ int ctx_idx = 0;
+ size_t rows = _src_block.rows();
+ auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
+ auto& filter_map = filter_column->get_data();
+ auto origin_column_num = _src_block.columns();
+
+ for (auto slot_desc : _output_tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+
+ int dest_index = ctx_idx++;
+
+ auto* ctx = _dest_vexpr_ctx[dest_index];
+ int result_column_id = -1;
+ // PT1 => dest primitive type
+ RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
+ bool is_origin_column = result_column_id < origin_column_num;
+ auto column_ptr =
+ is_origin_column && _src_block_mem_reuse
+ ?
_src_block.get_by_position(result_column_id).column->clone_resized(rows)
+ : _src_block.get_by_position(result_column_id).column;
+
+ DCHECK(column_ptr != nullptr);
+
+ // because of src_slot_desc is always be nullable, so the column_ptr
after do dest_expr
+ // is likely to be nullable
+ if (LIKELY(column_ptr->is_nullable())) {
+ auto nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ for (int i = 0; i < rows; ++i) {
+ if (filter_map[i] && nullable_column->is_null_at(i)) {
+ if (_strict_mode &&
(_src_slot_descs_order_by_dest[dest_index]) &&
+
!_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index])
+ .column->is_null_at(i)) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return _src_block.dump_one_line(i,
_num_of_columns_from_file);
+ },
+ [&]() -> std::string {
+ auto raw_value =
+
_src_block.get_by_position(ctx_idx).column->get_data_at(
+ i);
+ std::string raw_string =
raw_value.to_string();
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg,
+ "column({}) value is
incorrect while strict "
+ "mode is {}, "
+ "src value is {}",
+ slot_desc->col_name(),
_strict_mode, raw_string);
+ return fmt::to_string(error_msg);
+ },
+ &_scanner_eof));
+ filter_map[i] = false;
+ } else if (!slot_desc->is_nullable()) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return _src_block.dump_one_line(i,
_num_of_columns_from_file);
+ },
+ [&]() -> std::string {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg,
+ "column({}) values is null
while columns is not "
+ "nullable",
+ slot_desc->col_name());
+ return fmt::to_string(error_msg);
+ },
+ &_scanner_eof));
+ filter_map[i] = false;
+ }
+ }
+ }
+ if (!slot_desc->is_nullable()) {
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ } else if (slot_desc->is_nullable()) {
+ column_ptr = vectorized::make_nullable(column_ptr);
+ }
+ block->insert(dest_index,
vectorized::ColumnWithTypeAndName(std::move(column_ptr),
+
slot_desc->get_data_type_ptr(),
+
slot_desc->col_name()));
+ }
+
+ // after do the dest block insert operation, clear _src_block to remove
the reference of origin column
+ if (_src_block_mem_reuse) {
+ _src_block.clear_column_data(origin_column_num);
+ } else {
+ _src_block.clear();
+ }
+
+ size_t dest_size = block->columns();
+ // do filter
+ block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
+
std::make_shared<vectorized::DataTypeUInt8>(),
+ "filter column"));
+ RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size,
dest_size));
+ // _counter->num_rows_filtered += rows - dest_block->rows();
Review Comment:
The `counter` is needed for load job, you need to fix them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]