This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ed527a7d9c6 branch-3.1: [fix](tvf) support compressed json file for 
tvf and refactor code #51983 (#52581)
ed527a7d9c6 is described below

commit ed527a7d9c6bd161619dbe74bab0731ecb3d3b92
Author: Socrates <suyit...@selectdb.com>
AuthorDate: Sat Jul 5 20:26:34 2025 +0800

    branch-3.1: [fix](tvf) support compressed json file for tvf and refactor 
code #51983 (#52581)
    
    bp: #51983
---
 be/src/olap/push_handler.cpp                       |   1 -
 be/src/service/internal_service.cpp                |   7 +-
 be/src/vec/exec/format/avro/avro_jni_reader.cpp    |   5 +-
 be/src/vec/exec/format/avro/avro_jni_reader.h      |   4 +-
 be/src/vec/exec/format/csv/csv_reader.cpp          |  77 ++++++++++-----------
 be/src/vec/exec/format/csv/csv_reader.h            |   7 +-
 be/src/vec/exec/format/generic_reader.h            |   6 ++
 be/src/vec/exec/format/json/new_json_reader.cpp    |  13 ++--
 be/src/vec/exec/format/json/new_json_reader.h      |   1 +
 be/src/vec/exec/format/orc/vorc_reader.cpp         |   9 ++-
 be/src/vec/exec/format/orc/vorc_reader.h           |   2 +
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  22 +++---
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   4 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    |   6 +-
 be/src/vec/exec/format/wal/wal_reader.h            |   2 +
 be/src/vec/exec/scan/vfile_scanner.cpp             |  12 +---
 be/src/vec/exec/scan/vfile_scanner.h               |   1 -
 .../exec/format/parquet/parquet_reader_test.cpp    |   4 --
 .../json_format_test/simple_object_json.json.gz    | Bin 0 -> 211 bytes
 .../data/external_table_p0/tvf/test_hdfs_tvf.out   | Bin 40945 -> 41176 bytes
 .../external_table_p0/tvf/test_hdfs_tvf.groovy     |  10 +++
 21 files changed, 105 insertions(+), 88 deletions(-)

diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 8cfe6a1a7f9..4233b79e865 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -643,7 +643,6 @@ Status PushBrokerReader::_get_next_reader() {
                         
const_cast<cctz::time_zone*>(&_runtime_state->timezone_obj()),
                         _io_ctx.get(), _runtime_state.get());
 
-        RETURN_IF_ERROR(parquet_reader->open());
         std::vector<std::string> place_holder;
         init_status = parquet_reader->init_reader(
                 _all_col_names, place_holder, _colname_to_value_range, 
_push_down_exprs,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index ec5e0b7adf0..3cb999fea1b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -856,7 +856,6 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
         case TFileFormatType::FORMAT_AVRO: {
             reader = vectorized::AvroJNIReader::create_unique(profile.get(), 
params, range,
                                                               file_slots);
-            st = 
((vectorized::AvroJNIReader*)(reader.get()))->init_fetch_table_schema_reader();
             break;
         }
         default:
@@ -865,6 +864,12 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
             st.to_protobuf(result->mutable_status());
             return;
         }
+        if (!st.ok()) {
+            LOG(WARNING) << "failed to create reader, errmsg=" << st;
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        st = reader->init_schema_reader();
         if (!st.ok()) {
             LOG(WARNING) << "failed to init reader, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp 
b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
index 6591abab58d..d6c38730f9f 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
@@ -54,7 +54,7 @@ Status 
AvroJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
-Status AvroJNIReader::init_fetch_table_reader(
+Status AvroJNIReader::init_reader(
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
     _colname_to_value_range = colname_to_value_range;
     std::ostringstream required_fields;
@@ -107,7 +107,8 @@ TFileType::type AvroJNIReader::get_file_type() {
     return type;
 }
 
-Status AvroJNIReader::init_fetch_table_schema_reader() {
+// open the jni connector for parsing schema
+Status AvroJNIReader::init_schema_reader() {
     std::map<String, String> required_param = {{"uri", _range.path},
                                                {"file_type", 
std::to_string(get_file_type())},
                                                {"is_get_table_schema", 
"true"}};
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h 
b/be/src/vec/exec/format/avro/avro_jni_reader.h
index c8d55cf58cf..7daaa232f64 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.h
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.h
@@ -70,12 +70,12 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
-    Status init_fetch_table_reader(
+    Status init_reader(
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
     TFileType::type get_file_type();
 
-    Status init_fetch_table_schema_reader();
+    Status init_schema_reader() override;
 
     Status get_parsed_schema(std::vector<std::string>* col_names,
                              std::vector<TypeDescriptor>* col_types) override;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 34a789cf854..6ce31b59561 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -396,14 +396,45 @@ Status 
CsvReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* n
     return Status::OK();
 }
 
+// init decompressor, file reader and line reader for parsing schema
+Status CsvReader::init_schema_reader() {
+    _start_offset = _range.start_offset;
+    if (_start_offset != 0) {
+        return Status::InvalidArgument(
+                "start offset of TFileRangeDesc must be zero in get parsered 
schema");
+    }
+    if (_params.file_type == TFileType::FILE_BROKER) {
+        return Status::InternalError<false>(
+                "Getting parsered schema from csv file do not support stream 
load and broker "
+                "load.");
+    }
+
+    // csv file without names line and types line.
+    _read_line = 1;
+    _is_parse_name = false;
+
+    if (_params.__isset.file_attributes && 
_params.file_attributes.__isset.header_type &&
+        !_params.file_attributes.header_type.empty()) {
+        std::string header_type = 
to_lower(_params.file_attributes.header_type);
+        if (header_type == BeConsts::CSV_WITH_NAMES) {
+            _is_parse_name = true;
+        } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+            _read_line = 2;
+            _is_parse_name = true;
+        }
+    }
+
+    RETURN_IF_ERROR(_init_options());
+    RETURN_IF_ERROR(_create_file_reader(true));
+    RETURN_IF_ERROR(_create_decompressor());
+    RETURN_IF_ERROR(_create_line_reader());
+    return Status::OK();
+}
+
 Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
                                     std::vector<TypeDescriptor>* col_types) {
-    size_t read_line = 0;
-    bool is_parse_name = false;
-    RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name));
-
-    if (read_line == 1) {
-        if (!is_parse_name) { //parse csv file without names and types
+    if (_read_line == 1) {
+        if (!_is_parse_name) { //parse csv file without names and types
             size_t col_nums = 0;
             RETURN_IF_ERROR(_parse_col_nums(&col_nums));
             for (size_t i = 0; i < col_nums; ++i) {
@@ -708,40 +739,6 @@ void CsvReader::_split_line(const Slice& line) {
     _fields_splitter->split_line(line, &_split_values);
 }
 
-Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
-    _start_offset = _range.start_offset;
-    if (_start_offset != 0) {
-        return Status::InvalidArgument(
-                "start offset of TFileRangeDesc must be zero in get parsered 
schema");
-    }
-    if (_params.file_type == TFileType::FILE_BROKER) {
-        return Status::InternalError<false>(
-                "Getting parsered schema from csv file do not support stream 
load and broker "
-                "load.");
-    }
-
-    // csv file without names line and types line.
-    *read_line = 1;
-    *is_parse_name = false;
-
-    if (_params.__isset.file_attributes && 
_params.file_attributes.__isset.header_type &&
-        !_params.file_attributes.header_type.empty()) {
-        std::string header_type = 
to_lower(_params.file_attributes.header_type);
-        if (header_type == BeConsts::CSV_WITH_NAMES) {
-            *is_parse_name = true;
-        } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
-            *read_line = 2;
-            *is_parse_name = true;
-        }
-    }
-
-    RETURN_IF_ERROR(_init_options());
-    RETURN_IF_ERROR(_create_file_reader(true));
-    RETURN_IF_ERROR(_create_decompressor());
-    RETURN_IF_ERROR(_create_line_reader());
-    return Status::OK();
-}
-
 Status CsvReader::_parse_col_nums(size_t* col_nums) {
     const uint8_t* ptr = nullptr;
     size_t size = 0;
diff --git a/be/src/vec/exec/format/csv/csv_reader.h 
b/be/src/vec/exec/format/csv/csv_reader.h
index 1f060d18ac3..117b5058c52 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -182,6 +182,7 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
+    Status init_schema_reader() override;
     // get schema of csv file from first one line or first two lines.
     // if file format is FORMAT_CSV_DEFLATE and if
     // 1. header_type is empty, get schema from first line.
@@ -231,9 +232,6 @@ private:
     void _init_system_properties();
     void _init_file_description();
 
-    // used for parse table schema of csv file.
-    // Currently, this feature is for table valued function.
-    Status _prepare_parse(size_t* read_line, bool* is_parse_name);
     Status _parse_col_nums(size_t* col_nums);
     Status _parse_col_names(std::vector<std::string>* col_names);
     // TODO(ftw): parse type
@@ -263,6 +261,9 @@ private:
     // True if this is a load task
     bool _is_load = false;
     bool _line_reader_eof;
+    // For schema reader
+    size_t _read_line = 0;
+    bool _is_parse_name = false;
     TFileFormatType::type _file_format_type;
     bool _is_proto_format;
     TFileCompressType::type _file_compress_type;
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index e32928e4b95..c853cae15a6 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -45,6 +45,12 @@ public:
         return Status::NotSupported("get_columns is not implemented");
     }
 
+    // This method is responsible for initializing the resource for parsing 
schema.
+    // It will be called before `get_parsed_schema`.
+    virtual Status init_schema_reader() {
+        return Status::NotSupported("init_schema_reader is not implemented for 
this reader.");
+    }
+    // `col_types` is always nullable to process illegal values.
     virtual Status get_parsed_schema(std::vector<std::string>* col_names,
                                      std::vector<TypeDescriptor>* col_types) {
         return Status::NotSupported("get_parsed_schema is not implemented for 
this reader.");
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index c8969c6d4c3..173bb2b4dc4 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -255,18 +255,23 @@ Status 
NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
-Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
-                                        std::vector<TypeDescriptor>* 
col_types) {
+// init decompressor, file reader and line reader for parsing schema
+Status NewJsonReader::init_schema_reader() {
     RETURN_IF_ERROR(_get_range_params());
-
+    // create decompressor.
+    // _decompressor may be nullptr if this is not a compressed file
+    RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, 
&_decompressor));
     RETURN_IF_ERROR(_open_file_reader(true));
     if (_read_json_by_line) {
         RETURN_IF_ERROR(_open_line_reader());
     }
-
     // generate _parsed_jsonpaths and _parsed_json_root
     RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+    return Status::OK();
+}
 
+Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                        std::vector<TypeDescriptor>* 
col_types) {
     bool eof = false;
     const uint8_t* json_str = nullptr;
     std::unique_ptr<uint8_t[]> json_str_ptr;
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 31ddc0fa9c9..967a5300529 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -93,6 +93,7 @@ public:
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
+    Status init_schema_reader() override;
     Status get_parsed_schema(std::vector<std::string>* col_names,
                              std::vector<TypeDescriptor>* col_types) override;
 
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 35ff0c7561c..2c10c9ff29c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -359,10 +359,14 @@ Status OrcReader::init_reader(
     return Status::OK();
 }
 
+// init file reader for parsing schema
+Status OrcReader::init_schema_reader() {
+    return _create_file_reader();
+}
+
 Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
                                     std::vector<TypeDescriptor>* col_types) {
-    RETURN_IF_ERROR(_create_file_reader());
-    auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : 
_reader->getType();
+    const auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : 
_reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
         col_names->emplace_back(get_field_name_lower_case(&root_type, i));
         
col_types->emplace_back(convert_to_doris_type(root_type.getSubtype(i)));
@@ -374,7 +378,6 @@ Status 
OrcReader::get_schema_col_name_attribute(std::vector<std::string>* col_na
                                                 std::vector<int32_t>* 
col_attributes,
                                                 const std::string& attribute,
                                                 bool* exist_attribute) {
-    RETURN_IF_ERROR(_create_file_reader());
     *exist_attribute = true;
     auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : 
_reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 3e2b785cf03..fc4fba4789c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -177,6 +177,8 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
+    Status init_schema_reader() override;
+
     Status get_parsed_schema(std::vector<std::string>* col_names,
                              std::vector<TypeDescriptor>* col_types) override;
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a38031a668f..1b48f04066e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -259,12 +259,6 @@ const FieldDescriptor 
ParquetReader::get_file_metadata_schema() {
     return _file_metadata->schema();
 }
 
-Status ParquetReader::open() {
-    RETURN_IF_ERROR(_open_file());
-    _t_metadata = &(_file_metadata->to_thrift());
-    return Status::OK();
-}
-
 void ParquetReader::_init_system_properties() {
     if (_scan_range.__isset.file_type) {
         // for compatibility
@@ -311,10 +305,8 @@ Status ParquetReader::init_reader(
     _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
     _colname_to_value_range = colname_to_value_range;
     _hive_use_column_names = hive_use_column_names;
-    if (_file_metadata == nullptr) {
-        return Status::InternalError("failed to init parquet reader, please 
open reader first");
-    }
-
+    RETURN_IF_ERROR(_open_file());
+    _t_metadata = &(_file_metadata->to_thrift());
     SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
     _total_groups = _t_metadata->row_groups.size();
     if (_total_groups == 0) {
@@ -491,11 +483,15 @@ Status ParquetReader::set_fill_columns(
     return Status::OK();
 }
 
-Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
-                                        std::vector<TypeDescriptor>* 
col_types) {
+// init file reader and file metadata for parsing schema
+Status ParquetReader::init_schema_reader() {
     RETURN_IF_ERROR(_open_file());
-    _t_metadata = &_file_metadata->to_thrift();
+    _t_metadata = &(_file_metadata->to_thrift());
+    return Status::OK();
+}
 
+Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                        std::vector<TypeDescriptor>* 
col_types) {
     _total_groups = _t_metadata->row_groups.size();
     auto schema_desc = _file_metadata->schema();
     for (int i = 0; i < schema_desc.size(); ++i) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index e24071093b6..d189343e82e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -107,8 +107,6 @@ public:
     // for test
     void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = 
file_reader; }
 
-    Status open();
-
     Status init_reader(
             const std::vector<std::string>& all_column_names,
             const std::vector<std::string>& missing_column_names,
@@ -134,6 +132,8 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
+    Status init_schema_reader() override;
+
     Status get_parsed_schema(std::vector<std::string>* col_names,
                              std::vector<TypeDescriptor>* col_types) override;
 
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index c297904ca41..cd1bded9eac 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -160,6 +160,7 @@ Status IcebergTableReader::_equality_delete_base(
         delete_desc.file_size = -1;
         std::unique_ptr<GenericReader> delete_reader = 
_create_equality_reader(delete_desc);
         if (!init_schema) {
+            RETURN_IF_ERROR(delete_reader->init_schema_reader());
             
RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names,
                                                              
&equality_delete_col_types));
             _generate_equality_delete_block(&_equality_delete_block, 
equality_delete_col_names,
@@ -167,7 +168,6 @@ Status IcebergTableReader::_equality_delete_base(
             init_schema = true;
         }
         if (auto* parquet_reader = 
typeid_cast<ParquetReader*>(delete_reader.get())) {
-            RETURN_IF_ERROR(parquet_reader->open());
             
RETURN_IF_ERROR(parquet_reader->init_reader(equality_delete_col_names,
                                                         not_in_file_col_names, 
nullptr, {}, nullptr,
                                                         nullptr, nullptr, 
nullptr, nullptr, false));
@@ -446,8 +446,6 @@ Status IcebergParquetReader 
::_read_position_delete_file(const TFileRangeDesc* d
     ParquetReader parquet_delete_reader(
             _profile, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE,
             const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx, 
_state);
-
-    RETURN_IF_ERROR(parquet_delete_reader.open());
     RETURN_IF_ERROR(parquet_delete_reader.init_reader(delete_file_col_names, 
{}, nullptr, {},
                                                       nullptr, nullptr, 
nullptr, nullptr, nullptr,
                                                       false));
@@ -542,6 +540,7 @@ Status IcebergOrcReader::_read_position_delete_file(const 
TFileRangeDesc* delete
 Status IcebergParquetReader::get_file_col_id_to_name(
         bool& exist_schema, std::map<int32_t, std::string>& 
file_col_id_to_name) {
     auto* parquet_reader = 
static_cast<ParquetReader*>(_file_format_reader.get());
+    RETURN_IF_ERROR(parquet_reader->init_schema_reader());
     FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
 
     if (field_desc.has_parquet_field_id()) {
@@ -561,6 +560,7 @@ Status IcebergOrcReader::get_file_col_id_to_name(
 
     std::vector<std::string> col_names;
     std::vector<int32_t> col_ids;
+    RETURN_IF_ERROR(orc_reader->init_schema_reader());
     RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute(
             &col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema));
     if (!exist_schema) {
diff --git a/be/src/vec/exec/format/wal/wal_reader.h 
b/be/src/vec/exec/format/wal/wal_reader.h
index 5834d74efea..8da5e74aa1d 100644
--- a/be/src/vec/exec/format/wal/wal_reader.h
+++ b/be/src/vec/exec/format/wal/wal_reader.h
@@ -24,6 +24,8 @@ namespace doris {
 namespace vectorized {
 struct ScannerCounter;
 class WalReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(WalReader);
+
 public:
     WalReader(RuntimeState* state);
     ~WalReader() override = default;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 52850d6e8e2..fb1910bded6 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -129,8 +129,6 @@ Status VFileScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& conju
     RETURN_IF_ERROR(VScanner::prepare(state, conjuncts));
     _get_block_timer =
             ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), 
"FileScannerGetBlockTime", 1);
-    _open_reader_timer =
-            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), 
"FileScannerOpenReaderTime", 1);
     _cast_to_input_block_timer = 
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
                                                       
"FileScannerCastInputBlockTime", 1);
     _fill_missing_columns_timer = 
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
@@ -942,10 +940,6 @@ Status VFileScanner::_get_next_reader() {
             // ATTN: the push down agg type may be set back to NONE,
             // see IcebergTableReader::init_row_filters for example.
             parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
-            {
-                SCOPED_TIMER(_open_reader_timer);
-                RETURN_IF_ERROR(parquet_reader->open());
-            }
             if (push_down_predicates) {
                 RETURN_IF_ERROR(_process_late_arrival_conjuncts());
             }
@@ -1110,12 +1104,12 @@ Status VFileScanner::_get_next_reader() {
         case TFileFormatType::FORMAT_AVRO: {
             _cur_reader = AvroJNIReader::create_unique(_state, _profile, 
*_params, _file_slot_descs,
                                                        range);
-            init_status = ((AvroJNIReader*)(_cur_reader.get()))
-                                  
->init_fetch_table_reader(_colname_to_value_range);
+            init_status =
+                    
((AvroJNIReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range);
             break;
         }
         case TFileFormatType::FORMAT_WAL: {
-            _cur_reader.reset(new WalReader(_state));
+            _cur_reader = WalReader::create_unique(_state);
             init_status = 
((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
             break;
         }
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 5b1604209d4..6a605da3dca 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -173,7 +173,6 @@ protected:
 
 private:
     RuntimeProfile::Counter* _get_block_timer = nullptr;
-    RuntimeProfile::Counter* _open_reader_timer = nullptr;
     RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
     RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
     RuntimeProfile::Counter* _pre_filter_timer = nullptr;
diff --git a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp 
b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
index 423adfd41ce..afa4d7f9d5f 100644
--- a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
@@ -150,7 +150,6 @@ TEST_F(ParquetReaderTest, normal) {
     runtime_state.set_desc_tbl(desc_tbl);
 
     std::unordered_map<std::string, ColumnValueRangeType> 
colname_to_value_range;
-    static_cast<void>(p_reader->open());
     static_cast<void>(p_reader->init_reader(column_names, 
missing_column_names, nullptr, {},
                                             nullptr, nullptr, nullptr, 
nullptr, nullptr));
     std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
@@ -231,7 +230,6 @@ TEST_F(ParquetReaderTest, use_column_name) {
     colname_to_value_range.emplace("smallint_col", 
ColumnValueRange<TYPE_SMALLINT>("smallint_col"));
     colname_to_value_range.emplace("int_col", 
ColumnValueRange<TYPE_INT>("int_col"));
 
-    static_cast<void>(p_reader->open());
     static_cast<void>(p_reader->init_reader(table_column_names, {}, 
&colname_to_value_range, {},
                                             nullptr, nullptr, nullptr, 
nullptr, nullptr, false,
                                             use_column_name));
@@ -271,7 +269,6 @@ TEST_F(ParquetReaderTest, use_column_name2) {
     colname_to_value_range.emplace("smallint_col", 
ColumnValueRange<TYPE_SMALLINT>("smallint_col"));
     colname_to_value_range.emplace("int_col", 
ColumnValueRange<TYPE_INT>("int_col"));
 
-    static_cast<void>(p_reader->open());
     static_cast<void>(p_reader->init_reader(table_column_names, 
{"boolean_col"},
                                             &colname_to_value_range, {}, 
nullptr, nullptr, nullptr,
                                             nullptr, nullptr, false, 
use_column_name));
@@ -314,7 +311,6 @@ TEST_F(ParquetReaderTest, use_column_idx) {
     colname_to_value_range.emplace("col3", 
ColumnValueRange<TYPE_SMALLINT>("col3"));
     colname_to_value_range.emplace("col102", 
ColumnValueRange<TYPE_SMALLINT>("col102"));
 
-    static_cast<void>(p_reader->open());
     static_cast<void>(p_reader->init_reader(table_column_names, {}, 
&colname_to_value_range, {},
                                             nullptr, nullptr, nullptr, 
nullptr, nullptr, false,
                                             use_column_name));
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz
 
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz
new file mode 100644
index 00000000000..8a6db90241f
Binary files /dev/null and 
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz
 differ
diff --git a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out 
b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
index a8f5dcf5396..04ec58cdbae 100644
Binary files a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out and 
b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out differ
diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy 
b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
index 74cb1e320aa..8bc8194843d 100644
--- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
@@ -143,6 +143,16 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") 
{
                         "strip_outer_array" = "false",
                         "read_json_by_line" = "true") order by id; """
 
+            uri = "${defaultFS}" + 
"/user/doris/preinstalled_data/json_format_test/simple_object_json.json.gz"
+            format = "json"
+            qt_json_compressed """ select * from HDFS(
+                        "uri" = "${uri}",
+                        "hadoop.username" = "${hdfsUserName}",
+                        "format" = "${format}",
+                        "compress_type" = "GZ",
+                        "strip_outer_array" = "false",
+                        "read_json_by_line" = "true") order by id; """
+
 
            uri = "${defaultFS}" + 
"/user/doris/preinstalled_data/json_format_test/simple_object_json.json"
             format = "json"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to