hubgeter commented on code in PR #49944:
URL: https://github.com/apache/doris/pull/49944#discussion_r2046172798
##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -1864,7 +1881,7 @@ std::string OrcReader::get_field_name_lower_case(const
orc::Type* orc_type, int
Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof));
- if (*eof) {
+ if (*eof && _profile != nullptr) {
Review Comment:
In the materialization phase, `vfile_scanner` does not need `_profile`, so
`_profile` here is nullptr. Since `_orc_profile` is not initialized when
_profile = nullptr, I added this condition.
##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -1156,41 +1087,216 @@ Status VFileScanner::_get_next_reader() {
return Status::InternalError("failed to init reader, err: {}",
init_status.to_string());
}
- _name_to_col_type.clear();
- _missing_cols.clear();
- RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type,
&_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
- RETURN_IF_ERROR(_generate_missing_columns());
- RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs,
_missing_col_descs));
- if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
- fmt::memory_buffer col_buf;
- for (auto& col : _missing_cols) {
- fmt::format_to(col_buf, " {}", col);
- }
- VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}",
fmt::to_string(col_buf),
- range.path);
- }
-
- _source_file_col_names.clear();
- _source_file_col_types.clear();
- _source_file_col_name_types.clear();
- if (_state->query_options().truncate_char_or_varchar_columns &&
need_to_get_parsed_schema) {
- Status status =
_cur_reader->get_parsed_schema(&_source_file_col_names,
-
&_source_file_col_types);
- if (!status.ok() && status.code() !=
TStatusCode::NOT_IMPLEMENTED_ERROR) {
- return status;
- }
- DCHECK(_source_file_col_names.size() ==
_source_file_col_types.size());
- for (int i = 0; i < _source_file_col_names.size(); ++i) {
- _source_file_col_name_types[_source_file_col_names[i]] =
&_source_file_col_types[i];
- }
- }
+
RETURN_IF_ERROR(_set_fill_or_truncate_columns(need_to_get_parsed_schema));
_cur_reader_eof = false;
break;
}
return Status::OK();
}
+Status VFileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&&
parquet_reader) {
+ const TFileRangeDesc& range = _current_range;
+ Status init_status = Status::OK();
+
+ if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "iceberg") {
+ std::unique_ptr<IcebergParquetReader> iceberg_reader =
+ IcebergParquetReader::create_unique(std::move(parquet_reader),
_profile,
+ _state, *_params, range,
_kv_cache,
+ _io_ctx.get());
+ init_status = iceberg_reader->init_reader(
+ _file_col_names, _col_id_name_map, _colname_to_value_range,
+ _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
+ _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+ &_slot_id_to_filter_conjuncts);
+ _cur_reader = std::move(iceberg_reader);
+ } else if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "paimon") {
+ std::vector<std::string> place_holder;
+ init_status = parquet_reader->init_reader(
+ _file_col_names, place_holder, _colname_to_value_range,
+ _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
+ _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+ &_slot_id_to_filter_conjuncts);
+ std::unique_ptr<PaimonParquetReader> paimon_reader =
+ PaimonParquetReader::create_unique(std::move(parquet_reader),
_profile,
+ _state, *_params, range,
_io_ctx.get());
+ RETURN_IF_ERROR(paimon_reader->init_row_filters());
+ _cur_reader = std::move(paimon_reader);
+ } else {
+ bool hive_parquet_use_column_names = true;
+
+ if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "hive" && _state !=
nullptr)
+ [[likely]] {
+ hive_parquet_use_column_names =
+ _state->query_options().hive_parquet_use_column_names;
+ }
+
+ std::vector<std::string> place_holder;
+ init_status = parquet_reader->init_reader(
+ _file_col_names, place_holder, _colname_to_value_range,
+ _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
+ _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+ &_slot_id_to_filter_conjuncts, true,
hive_parquet_use_column_names);
+ _cur_reader = std::move(parquet_reader);
+ }
+ return init_status;
+}
+
+
+Status VFileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader)
{
+ const TFileRangeDesc& range = _current_range;
+ Status init_status = Status::OK();
+
+ if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "transactional_hive") {
+ std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
+ TransactionalHiveReader::create_unique(std::move(orc_reader),
_profile,
+ _state, *_params, range,
+ _io_ctx.get());
+ init_status = tran_orc_reader->init_reader(
+ _file_col_names, _colname_to_value_range, _push_down_conjuncts,
+ _real_tuple_desc, _default_val_row_desc.get(),
+ &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
+ RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
+ _cur_reader = std::move(tran_orc_reader);
+ } else if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "iceberg") {
+ std::unique_ptr<IcebergOrcReader> iceberg_reader =
+ IcebergOrcReader::create_unique(std::move(orc_reader),
_profile, _state,
+ *_params, range, _kv_cache,
_io_ctx.get());
+
+ init_status = iceberg_reader->init_reader(
+ _file_col_names, _col_id_name_map, _colname_to_value_range,
+ _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
+ _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+ &_slot_id_to_filter_conjuncts);
+ _cur_reader = std::move(iceberg_reader);
+ } else if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "paimon") {
+ init_status = orc_reader->init_reader(
+ &_file_col_names, _colname_to_value_range,
_push_down_conjuncts, false,
+ _real_tuple_desc, _default_val_row_desc.get(),
+ &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
+ std::unique_ptr<PaimonOrcReader> paimon_reader =
PaimonOrcReader::create_unique(
+ std::move(orc_reader), _profile, _state, *_params, range,
_io_ctx.get());
+ RETURN_IF_ERROR(paimon_reader->init_row_filters());
+ _cur_reader = std::move(paimon_reader);
+ } else {
+ bool hive_orc_use_column_names = true;
+
+ if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "hive" && _state !=
nullptr)
+ [[likely]] {
+ hive_orc_use_column_names =
_state->query_options().hive_orc_use_column_names;
+ }
+ init_status = orc_reader->init_reader(
+ &_file_col_names, _colname_to_value_range,
_push_down_conjuncts, false,
+ _real_tuple_desc, _default_val_row_desc.get(),
+ &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts,
+ hive_orc_use_column_names);
+ _cur_reader = std::move(orc_reader);
+ }
+
+ return init_status;
+}
+
+Status VFileScanner::_set_fill_or_truncate_columns(bool
need_to_get_parsed_schema) {
+ _name_to_col_type.clear();
+ _missing_cols.clear();
+ RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type,
&_missing_cols));
+ RETURN_IF_ERROR(_generate_missing_columns());
+ RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs,
_missing_col_descs));
+ if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
+ fmt::memory_buffer col_buf;
+ for (auto& col : _missing_cols) {
+ fmt::format_to(col_buf, " {}", col);
+ }
+ VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}",
fmt::to_string(col_buf),
+ _current_range.path);
+ }
+
+ RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
+ return Status::OK();
+}
+
+Status VFileScanner::_generate_truncate_columns(bool
need_to_get_parsed_schema) {
+ _source_file_col_names.clear();
+ _source_file_col_types.clear();
+ _source_file_col_name_types.clear();
+ if (_state->query_options().truncate_char_or_varchar_columns &&
need_to_get_parsed_schema) {
+ Status status = _cur_reader->get_parsed_schema(&_source_file_col_names,
+
&_source_file_col_types);
+ if (!status.ok() && status.code() !=
TStatusCode::NOT_IMPLEMENTED_ERROR) {
+ return status;
+ }
+ DCHECK(_source_file_col_names.size() == _source_file_col_types.size());
+ for (int i = 0; i < _source_file_col_names.size(); ++i) {
+ _source_file_col_name_types[_source_file_col_names[i]] =
&_source_file_col_types[i];
+ }
+ }
+ return Status::OK();
+}
+
+
+Status VFileScanner::read_one_line_from_current_range(segment_v2::rowid_t
rowid, Block* result_block,
+ ExternalFileMappingInfo
external_info) {
+ const TFileRangeDesc& range = _current_range;
+
+ RETURN_IF_ERROR(_init_io_ctx());
Review Comment:
fix . `prepare_for_read_one_line` is only called once
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]