This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new ed527a7d9c6 branch-3.1: [fix](tvf) support compressed json file for tvf and refactor code #51983 (#52581) ed527a7d9c6 is described below commit ed527a7d9c6bd161619dbe74bab0731ecb3d3b92 Author: Socrates <suyit...@selectdb.com> AuthorDate: Sat Jul 5 20:26:34 2025 +0800 branch-3.1: [fix](tvf) support compressed json file for tvf and refactor code #51983 (#52581) bp: #51983 --- be/src/olap/push_handler.cpp | 1 - be/src/service/internal_service.cpp | 7 +- be/src/vec/exec/format/avro/avro_jni_reader.cpp | 5 +- be/src/vec/exec/format/avro/avro_jni_reader.h | 4 +- be/src/vec/exec/format/csv/csv_reader.cpp | 77 ++++++++++----------- be/src/vec/exec/format/csv/csv_reader.h | 7 +- be/src/vec/exec/format/generic_reader.h | 6 ++ be/src/vec/exec/format/json/new_json_reader.cpp | 13 ++-- be/src/vec/exec/format/json/new_json_reader.h | 1 + be/src/vec/exec/format/orc/vorc_reader.cpp | 9 ++- be/src/vec/exec/format/orc/vorc_reader.h | 2 + be/src/vec/exec/format/parquet/vparquet_reader.cpp | 22 +++--- be/src/vec/exec/format/parquet/vparquet_reader.h | 4 +- be/src/vec/exec/format/table/iceberg_reader.cpp | 6 +- be/src/vec/exec/format/wal/wal_reader.h | 2 + be/src/vec/exec/scan/vfile_scanner.cpp | 12 +--- be/src/vec/exec/scan/vfile_scanner.h | 1 - .../exec/format/parquet/parquet_reader_test.cpp | 4 -- .../json_format_test/simple_object_json.json.gz | Bin 0 -> 211 bytes .../data/external_table_p0/tvf/test_hdfs_tvf.out | Bin 40945 -> 41176 bytes .../external_table_p0/tvf/test_hdfs_tvf.groovy | 10 +++ 21 files changed, 105 insertions(+), 88 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 8cfe6a1a7f9..4233b79e865 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -643,7 +643,6 @@ Status PushBrokerReader::_get_next_reader() { const_cast<cctz::time_zone*>(&_runtime_state->timezone_obj()), _io_ctx.get(), _runtime_state.get()); - RETURN_IF_ERROR(parquet_reader->open()); std::vector<std::string> place_holder; init_status = parquet_reader->init_reader( _all_col_names, place_holder, _colname_to_value_range, _push_down_exprs, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ec5e0b7adf0..3cb999fea1b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -856,7 +856,6 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr case TFileFormatType::FORMAT_AVRO: { reader = vectorized::AvroJNIReader::create_unique(profile.get(), params, range, file_slots); - st = ((vectorized::AvroJNIReader*)(reader.get()))->init_fetch_table_schema_reader(); break; } default: @@ -865,6 +864,12 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr st.to_protobuf(result->mutable_status()); return; } + if (!st.ok()) { + LOG(WARNING) << "failed to create reader, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + st = reader->init_schema_reader(); if (!st.ok()) { LOG(WARNING) << "failed to init reader, errmsg=" << st; st.to_protobuf(result->mutable_status()); diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index 6591abab58d..d6c38730f9f 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -54,7 +54,7 @@ Status AvroJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor return Status::OK(); } -Status AvroJNIReader::init_fetch_table_reader( +Status AvroJNIReader::init_reader( const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { _colname_to_value_range = colname_to_value_range; std::ostringstream required_fields; @@ -107,7 +107,8 @@ TFileType::type AvroJNIReader::get_file_type() { return type; } -Status AvroJNIReader::init_fetch_table_schema_reader() { +// open the jni connector for parsing schema +Status AvroJNIReader::init_schema_reader() { std::map<String, String> required_param = {{"uri", _range.path}, {"file_type", std::to_string(get_file_type())}, {"is_get_table_schema", "true"}}; diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h b/be/src/vec/exec/format/avro/avro_jni_reader.h index c8d55cf58cf..7daaa232f64 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.h +++ b/be/src/vec/exec/format/avro/avro_jni_reader.h @@ -70,12 +70,12 @@ public: Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; - Status init_fetch_table_reader( + Status init_reader( const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); TFileType::type get_file_type(); - Status init_fetch_table_schema_reader(); + Status init_schema_reader() override; Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) override; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 34a789cf854..6ce31b59561 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -396,14 +396,45 @@ Status CsvReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* n return Status::OK(); } +// init decompressor, file reader and line reader for parsing schema +Status CsvReader::init_schema_reader() { + _start_offset = _range.start_offset; + if (_start_offset != 0) { + return Status::InvalidArgument( + "start offset of TFileRangeDesc must be zero in get parsered schema"); + } + if (_params.file_type == TFileType::FILE_BROKER) { + return Status::InternalError<false>( + "Getting parsered schema from csv file do not support stream load and broker " + "load."); + } + + // csv file without names line and types line. + _read_line = 1; + _is_parse_name = false; + + if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && + !_params.file_attributes.header_type.empty()) { + std::string header_type = to_lower(_params.file_attributes.header_type); + if (header_type == BeConsts::CSV_WITH_NAMES) { + _is_parse_name = true; + } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { + _read_line = 2; + _is_parse_name = true; + } + } + + RETURN_IF_ERROR(_init_options()); + RETURN_IF_ERROR(_create_file_reader(true)); + RETURN_IF_ERROR(_create_decompressor()); + RETURN_IF_ERROR(_create_line_reader()); + return Status::OK(); +} + Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) { - size_t read_line = 0; - bool is_parse_name = false; - RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name)); - - if (read_line == 1) { - if (!is_parse_name) { //parse csv file without names and types + if (_read_line == 1) { + if (!_is_parse_name) { //parse csv file without names and types size_t col_nums = 0; RETURN_IF_ERROR(_parse_col_nums(&col_nums)); for (size_t i = 0; i < col_nums; ++i) { @@ -708,40 +739,6 @@ void CsvReader::_split_line(const Slice& line) { _fields_splitter->split_line(line, &_split_values); } -Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { - _start_offset = _range.start_offset; - if (_start_offset != 0) { - return Status::InvalidArgument( - "start offset of TFileRangeDesc must be zero in get parsered schema"); - } - if (_params.file_type == TFileType::FILE_BROKER) { - return Status::InternalError<false>( - "Getting parsered schema from csv file do not support stream load and broker " - "load."); - } - - // csv file without names line and types line. - *read_line = 1; - *is_parse_name = false; - - if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && - !_params.file_attributes.header_type.empty()) { - std::string header_type = to_lower(_params.file_attributes.header_type); - if (header_type == BeConsts::CSV_WITH_NAMES) { - *is_parse_name = true; - } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { - *read_line = 2; - *is_parse_name = true; - } - } - - RETURN_IF_ERROR(_init_options()); - RETURN_IF_ERROR(_create_file_reader(true)); - RETURN_IF_ERROR(_create_decompressor()); - RETURN_IF_ERROR(_create_line_reader()); - return Status::OK(); -} - Status CsvReader::_parse_col_nums(size_t* col_nums) { const uint8_t* ptr = nullptr; size_t size = 0; diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 1f060d18ac3..117b5058c52 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -182,6 +182,7 @@ public: Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; + Status init_schema_reader() override; // get schema of csv file from first one line or first two lines. // if file format is FORMAT_CSV_DEFLATE and if // 1. header_type is empty, get schema from first line. @@ -231,9 +232,6 @@ private: void _init_system_properties(); void _init_file_description(); - // used for parse table schema of csv file. - // Currently, this feature is for table valued function. - Status _prepare_parse(size_t* read_line, bool* is_parse_name); Status _parse_col_nums(size_t* col_nums); Status _parse_col_names(std::vector<std::string>* col_names); // TODO(ftw): parse type @@ -263,6 +261,9 @@ private: // True if this is a load task bool _is_load = false; bool _line_reader_eof; + // For schema reader + size_t _read_line = 0; + bool _is_parse_name = false; TFileFormatType::type _file_format_type; bool _is_proto_format; TFileCompressType::type _file_compress_type; diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index e32928e4b95..c853cae15a6 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -45,6 +45,12 @@ public: return Status::NotSupported("get_columns is not implemented"); } + // This method is responsible for initializing the resource for parsing schema. + // It will be called before `get_parsed_schema`. + virtual Status init_schema_reader() { + return Status::NotSupported("init_schema_reader is not implemented for this reader."); + } + // `col_types` is always nullable to process illegal values. virtual Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) { return Status::NotSupported("get_parsed_schema is not implemented for this reader."); diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index c8969c6d4c3..173bb2b4dc4 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -255,18 +255,23 @@ Status NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor return Status::OK(); } -Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) { +// init decompressor, file reader and line reader for parsing schema +Status NewJsonReader::init_schema_reader() { RETURN_IF_ERROR(_get_range_params()); - + // create decompressor. + // _decompressor may be nullptr if this is not a compressed file + RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); RETURN_IF_ERROR(_open_file_reader(true)); if (_read_json_by_line) { RETURN_IF_ERROR(_open_line_reader()); } - // generate _parsed_jsonpaths and _parsed_json_root RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); + return Status::OK(); +} +Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) { bool eof = false; const uint8_t* json_str = nullptr; std::unique_ptr<uint8_t[]> json_str_ptr; diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 31ddc0fa9c9..967a5300529 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -93,6 +93,7 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; + Status init_schema_reader() override; Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) override; diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 35ff0c7561c..2c10c9ff29c 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -359,10 +359,14 @@ Status OrcReader::init_reader( return Status::OK(); } +// init file reader for parsing schema +Status OrcReader::init_schema_reader() { + return _create_file_reader(); +} + Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) { - RETURN_IF_ERROR(_create_file_reader()); - auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); + const auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(get_field_name_lower_case(&root_type, i)); col_types->emplace_back(convert_to_doris_type(root_type.getSubtype(i))); @@ -374,7 +378,6 @@ Status OrcReader::get_schema_col_name_attribute(std::vector<std::string>* col_na std::vector<int32_t>* col_attributes, const std::string& attribute, bool* exist_attribute) { - RETURN_IF_ERROR(_create_file_reader()); *exist_attribute = true; auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 3e2b785cf03..fc4fba4789c 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -177,6 +177,8 @@ public: Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; + Status init_schema_reader() override; + Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) override; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index a38031a668f..1b48f04066e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -259,12 +259,6 @@ const FieldDescriptor ParquetReader::get_file_metadata_schema() { return _file_metadata->schema(); } -Status ParquetReader::open() { - RETURN_IF_ERROR(_open_file()); - _t_metadata = &(_file_metadata->to_thrift()); - return Status::OK(); -} - void ParquetReader::_init_system_properties() { if (_scan_range.__isset.file_type) { // for compatibility @@ -311,10 +305,8 @@ Status ParquetReader::init_reader( _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; _colname_to_value_range = colname_to_value_range; _hive_use_column_names = hive_use_column_names; - if (_file_metadata == nullptr) { - return Status::InternalError("failed to init parquet reader, please open reader first"); - } - + RETURN_IF_ERROR(_open_file()); + _t_metadata = &(_file_metadata->to_thrift()); SCOPED_RAW_TIMER(&_statistics.parse_meta_time); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { @@ -491,11 +483,15 @@ Status ParquetReader::set_fill_columns( return Status::OK(); } -Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names, - std::vector<TypeDescriptor>* col_types) { +// init file reader and file metadata for parsing schema +Status ParquetReader::init_schema_reader() { RETURN_IF_ERROR(_open_file()); - _t_metadata = &_file_metadata->to_thrift(); + _t_metadata = &(_file_metadata->to_thrift()); + return Status::OK(); +} +Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<TypeDescriptor>* col_types) { _total_groups = _t_metadata->row_groups.size(); auto schema_desc = _file_metadata->schema(); for (int i = 0; i < schema_desc.size(); ++i) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index e24071093b6..d189343e82e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -107,8 +107,6 @@ public: // for test void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = file_reader; } - Status open(); - Status init_reader( const std::vector<std::string>& all_column_names, const std::vector<std::string>& missing_column_names, @@ -134,6 +132,8 @@ public: Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; + Status init_schema_reader() override; + Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) override; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index c297904ca41..cd1bded9eac 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -160,6 +160,7 @@ Status IcebergTableReader::_equality_delete_base( delete_desc.file_size = -1; std::unique_ptr<GenericReader> delete_reader = _create_equality_reader(delete_desc); if (!init_schema) { + RETURN_IF_ERROR(delete_reader->init_schema_reader()); RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names, &equality_delete_col_types)); _generate_equality_delete_block(&_equality_delete_block, equality_delete_col_names, @@ -167,7 +168,6 @@ Status IcebergTableReader::_equality_delete_base( init_schema = true; } if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) { - RETURN_IF_ERROR(parquet_reader->open()); RETURN_IF_ERROR(parquet_reader->init_reader(equality_delete_col_names, not_in_file_col_names, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false)); @@ -446,8 +446,6 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d ParquetReader parquet_delete_reader( _profile, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE, const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx, _state); - - RETURN_IF_ERROR(parquet_delete_reader.open()); RETURN_IF_ERROR(parquet_delete_reader.init_reader(delete_file_col_names, {}, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false)); @@ -542,6 +540,7 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete Status IcebergParquetReader::get_file_col_id_to_name( bool& exist_schema, std::map<int32_t, std::string>& file_col_id_to_name) { auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get()); + RETURN_IF_ERROR(parquet_reader->init_schema_reader()); FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); if (field_desc.has_parquet_field_id()) { @@ -561,6 +560,7 @@ Status IcebergOrcReader::get_file_col_id_to_name( std::vector<std::string> col_names; std::vector<int32_t> col_ids; + RETURN_IF_ERROR(orc_reader->init_schema_reader()); RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute( &col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema)); if (!exist_schema) { diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h index 5834d74efea..8da5e74aa1d 100644 --- a/be/src/vec/exec/format/wal/wal_reader.h +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -24,6 +24,8 @@ namespace doris { namespace vectorized { struct ScannerCounter; class WalReader : public GenericReader { + ENABLE_FACTORY_CREATOR(WalReader); + public: WalReader(RuntimeState* state); ~WalReader() override = default; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 52850d6e8e2..fb1910bded6 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -129,8 +129,6 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju RETURN_IF_ERROR(VScanner::prepare(state, conjuncts)); _get_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1); - _open_reader_timer = - ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerOpenReaderTime", 1); _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerCastInputBlockTime", 1); _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), @@ -942,10 +940,6 @@ Status VFileScanner::_get_next_reader() { // ATTN: the push down agg type may be set back to NONE, // see IcebergTableReader::init_row_filters for example. parquet_reader->set_push_down_agg_type(_get_push_down_agg_type()); - { - SCOPED_TIMER(_open_reader_timer); - RETURN_IF_ERROR(parquet_reader->open()); - } if (push_down_predicates) { RETURN_IF_ERROR(_process_late_arrival_conjuncts()); } @@ -1110,12 +1104,12 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_AVRO: { _cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs, range); - init_status = ((AvroJNIReader*)(_cur_reader.get())) - ->init_fetch_table_reader(_colname_to_value_range); + init_status = + ((AvroJNIReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range); break; } case TFileFormatType::FORMAT_WAL: { - _cur_reader.reset(new WalReader(_state)); + _cur_reader = WalReader::create_unique(_state); init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc); break; } diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 5b1604209d4..6a605da3dca 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -173,7 +173,6 @@ protected: private: RuntimeProfile::Counter* _get_block_timer = nullptr; - RuntimeProfile::Counter* _open_reader_timer = nullptr; RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr; RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr; RuntimeProfile::Counter* _pre_filter_timer = nullptr; diff --git a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp index 423adfd41ce..afa4d7f9d5f 100644 --- a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp @@ -150,7 +150,6 @@ TEST_F(ParquetReaderTest, normal) { runtime_state.set_desc_tbl(desc_tbl); std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range; - static_cast<void>(p_reader->open()); static_cast<void>(p_reader->init_reader(column_names, missing_column_names, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr)); std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> @@ -231,7 +230,6 @@ TEST_F(ParquetReaderTest, use_column_name) { colname_to_value_range.emplace("smallint_col", ColumnValueRange<TYPE_SMALLINT>("smallint_col")); colname_to_value_range.emplace("int_col", ColumnValueRange<TYPE_INT>("int_col")); - static_cast<void>(p_reader->open()); static_cast<void>(p_reader->init_reader(table_column_names, {}, &colname_to_value_range, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false, use_column_name)); @@ -271,7 +269,6 @@ TEST_F(ParquetReaderTest, use_column_name2) { colname_to_value_range.emplace("smallint_col", ColumnValueRange<TYPE_SMALLINT>("smallint_col")); colname_to_value_range.emplace("int_col", ColumnValueRange<TYPE_INT>("int_col")); - static_cast<void>(p_reader->open()); static_cast<void>(p_reader->init_reader(table_column_names, {"boolean_col"}, &colname_to_value_range, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false, use_column_name)); @@ -314,7 +311,6 @@ TEST_F(ParquetReaderTest, use_column_idx) { colname_to_value_range.emplace("col3", ColumnValueRange<TYPE_SMALLINT>("col3")); colname_to_value_range.emplace("col102", ColumnValueRange<TYPE_SMALLINT>("col102")); - static_cast<void>(p_reader->open()); static_cast<void>(p_reader->init_reader(table_column_names, {}, &colname_to_value_range, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false, use_column_name)); diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz new file mode 100644 index 00000000000..8a6db90241f Binary files /dev/null and b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz differ diff --git a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out index a8f5dcf5396..04ec58cdbae 100644 Binary files a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out and b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out differ diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy index 74cb1e320aa..8bc8194843d 100644 --- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy @@ -143,6 +143,16 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { "strip_outer_array" = "false", "read_json_by_line" = "true") order by id; """ + uri = "${defaultFS}" + "/user/doris/preinstalled_data/json_format_test/simple_object_json.json.gz" + format = "json" + qt_json_compressed """ select * from HDFS( + "uri" = "${uri}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}", + "compress_type" = "GZ", + "strip_outer_array" = "false", + "read_json_by_line" = "true") order by id; """ + uri = "${defaultFS}" + "/user/doris/preinstalled_data/json_format_test/simple_object_json.json" format = "json" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org