This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 43eb946543 [feature](table-valued-function)S3 table valued function 
supports parquet/orc/json file format #14130
43eb946543 is described below

commit 43eb9465438edd090b0249b0b8ecff6cf4b5cf54
Author: Tiewei Fang <[email protected]>
AuthorDate: Thu Nov 10 10:33:12 2022 +0800

    [feature](table-valued-function)S3 table valued function supports 
parquet/orc/json file format #14130
    
    S3 table valued function supports parquet/orc/json file format.
    For example: parquet format
---
 be/src/service/internal_service.cpp                |  25 +++-
 be/src/vec/exec/format/json/new_json_reader.cpp    | 130 ++++++++++++++++++++-
 be/src/vec/exec/format/json/new_json_reader.h      |   7 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  42 +++++++
 be/src/vec/exec/format/orc/vorc_reader.h           |   6 +
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  35 ++++++
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   6 +
 .../doris/planner/external/HiveScanProvider.java   |  25 ++--
 .../doris/planner/external/QueryScanProvider.java  |  17 +--
 .../doris/planner/external/TVFScanProvider.java    |  15 +--
 .../ExternalFileTableValuedFunction.java           | 104 ++++++++++++-----
 .../doris/tablefunction/S3TableValuedFunction.java |  35 ++----
 12 files changed, 348 insertions(+), 99 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 26301b8f4e..d15726272f 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -51,6 +51,9 @@
 #include "util/uid_util.h"
 #include "vec/exec/format/csv/csv_reader.h"
 #include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/orc/vorc_reader.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
 namespace doris {
@@ -426,7 +429,6 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
             return;
         }
     }
-
     if (file_scan_range.__isset.ranges == false) {
         st = Status::InternalError("can not get TFileRangeDesc.");
         st.to_protobuf(result->mutable_status());
@@ -439,8 +441,7 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
     }
     const TFileRangeDesc& range = file_scan_range.ranges.at(0);
     const TFileScanRangeParams& params = file_scan_range.params;
-    // file_slots is no use
-    std::vector<SlotDescriptor*> file_slots;
+
     std::unique_ptr<vectorized::GenericReader> reader(nullptr);
     std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
     switch (params.format_type) {
@@ -450,16 +451,32 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
     case TFileFormatType::FORMAT_CSV_LZ4FRAME:
     case TFileFormatType::FORMAT_CSV_LZOP:
     case TFileFormatType::FORMAT_CSV_DEFLATE: {
+        // file_slots is no use
+        std::vector<SlotDescriptor*> file_slots;
         reader.reset(new vectorized::CsvReader(profile.get(), params, range, 
file_slots));
         break;
     }
+    case TFileFormatType::FORMAT_PARQUET: {
+        std::vector<std::string> column_names;
+        reader.reset(new vectorized::ParquetReader(params, range, 
column_names));
+        break;
+    }
+    case TFileFormatType::FORMAT_ORC: {
+        std::vector<std::string> column_names;
+        reader.reset(new vectorized::OrcReader(params, range, column_names, 
""));
+        break;
+    }
+    case TFileFormatType::FORMAT_JSON: {
+        std::vector<SlotDescriptor*> file_slots;
+        reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots));
+        break;
+    }
     default:
         st = Status::InternalError("Not supported file format in fetch table 
schema: {}",
                                    params.format_type);
         st.to_protobuf(result->mutable_status());
         return;
     }
-    std::unordered_map<std::string, TypeDescriptor> name_to_col_type;
     std::vector<std::string> col_names;
     std::vector<TypeDescriptor> col_types;
     st = reader->get_parsered_schema(&col_names, &col_types);
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index a8fbeae1c1..6b60a144cf 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -49,13 +49,32 @@ NewJsonReader::NewJsonReader(RuntimeState* state, 
RuntimeProfile* profile, Scann
           _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
           _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), 
&_parse_allocator),
           _scanner_eof(scanner_eof) {
-    _file_format_type = _params.format_type;
-
     _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
     _read_timer = ADD_TIMER(_profile, "ReadTime");
     _file_read_timer = ADD_TIMER(_profile, "FileReadTime");
 }
 
+NewJsonReader::NewJsonReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
+                             const TFileRangeDesc& range,
+                             const std::vector<SlotDescriptor*>& 
file_slot_descs)
+        : _vhandle_json_callback(nullptr),
+          _state(nullptr),
+          _profile(profile),
+          _params(params),
+          _range(range),
+          _file_slot_descs(file_slot_descs),
+          _file_reader(nullptr),
+          _file_reader_s(nullptr),
+          _real_file_reader(nullptr),
+          _line_reader(nullptr),
+          _reader_eof(false),
+          _skip_first_line(false),
+          _next_row(0),
+          _total_rows(0),
+          _value_allocator(_value_buffer, sizeof(_value_buffer)),
+          _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
+          _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), 
&_parse_allocator) {}
+
 Status NewJsonReader::init_reader() {
     RETURN_IF_ERROR(_get_range_params());
 
@@ -120,6 +139,113 @@ Status 
NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
+Status NewJsonReader::get_parsered_schema(std::vector<std::string>* col_names,
+                                          std::vector<TypeDescriptor>* 
col_types) {
+    RETURN_IF_ERROR(_get_range_params());
+
+    RETURN_IF_ERROR(_open_file_reader());
+    if (_read_json_by_line) {
+        RETURN_IF_ERROR(_open_line_reader());
+    }
+
+    // generate _parsed_jsonpaths and _parsed_json_root
+    RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+    bool eof = false;
+    const uint8_t* json_str = nullptr;
+    std::unique_ptr<uint8_t[]> json_str_ptr;
+    size_t size = 0;
+    if (_line_reader != nullptr) {
+        RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof));
+    } else {
+        int64_t length = 0;
+        RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr, 
&length));
+        json_str = json_str_ptr.get();
+        size = length;
+        if (length == 0) {
+            eof = true;
+        }
+    }
+
+    if (size == 0 || eof) {
+        return Status::EndOfFile("Empty file.");
+    }
+
+    // clear memory here.
+    _value_allocator.Clear();
+    _parse_allocator.Clear();
+    bool has_parse_error = false;
+
+    // parse jsondata to JsonDoc
+    // As the issue: https://github.com/Tencent/rapidjson/issues/1458
+    // Now, rapidjson only support uint64_t, So lagreint load cause bug. We 
use kParseNumbersAsStringsFlag.
+    if (_num_as_string) {
+        has_parse_error =
+                
_origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, 
size)
+                        .HasParseError();
+    } else {
+        has_parse_error = _origin_json_doc.Parse((char*)json_str, 
size).HasParseError();
+    }
+
+    if (has_parse_error) {
+        return Status::DataQualityError(
+                "Parse json data for JsonDoc failed. code: {}, error info: {}",
+                _origin_json_doc.GetParseError(),
+                rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
+    }
+
+    // set json root
+    if (_parsed_json_root.size() != 0) {
+        _json_doc = JsonFunctions::get_json_object_from_parsed_json(
+                _parsed_json_root, &_origin_json_doc, 
_origin_json_doc.GetAllocator());
+        if (_json_doc == nullptr) {
+            return Status::DataQualityError("JSON Root not found.");
+        }
+    } else {
+        _json_doc = &_origin_json_doc;
+    }
+
+    if (_json_doc->IsArray() && !_strip_outer_array) {
+        return Status::DataQualityError(
+                "JSON data is array-object, `strip_outer_array` must be 
TRUE.");
+    } else if (!_json_doc->IsArray() && _strip_outer_array) {
+        return Status::DataQualityError(
+                "JSON data is not an array-object, `strip_outer_array` must be 
FALSE.");
+    }
+
+    rapidjson::Value* objectValue = nullptr;
+    if (_json_doc->IsArray()) {
+        if (_json_doc->Size() == 0) {
+            // may be passing an empty json, such as "[]"
+            return Status::InternalError("Empty first json line");
+        }
+        objectValue = &(*_json_doc)[0];
+    } else {
+        objectValue = _json_doc;
+    }
+
+    // use jsonpaths to col_names
+    if (_parsed_jsonpaths.size() > 0) {
+        for (size_t i = 0; i < _parsed_jsonpaths.size(); ++i) {
+            size_t len = _parsed_jsonpaths[i].size();
+            if (len == 0) {
+                return Status::InvalidArgument("It's invalid jsonpaths.");
+            }
+            std::string key = _parsed_jsonpaths[i][len - 1].key;
+            col_names->emplace_back(key);
+            col_types->emplace_back(TypeDescriptor::create_string_type());
+        }
+        return Status::OK();
+    }
+
+    for (int i = 0; i < objectValue->MemberCount(); ++i) {
+        auto it = objectValue->MemberBegin() + i;
+        col_names->emplace_back(it->name.GetString());
+        col_types->emplace_back(TypeDescriptor::create_string_type());
+    }
+    return Status::OK();
+}
+
 Status NewJsonReader::_get_range_params() {
     if (!_params.__isset.file_attributes) {
         return Status::InternalError("BE cat get file_attributes");
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index aee11535fb..6b003c30fe 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -39,12 +39,17 @@ public:
     NewJsonReader(RuntimeState* state, RuntimeProfile* profile, 
ScannerCounter* counter,
                   const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
                   const std::vector<SlotDescriptor*>& file_slot_descs, bool* 
scanner_eof);
+
+    NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+                  const TFileRangeDesc& range, const 
std::vector<SlotDescriptor*>& file_slot_descs);
     ~NewJsonReader() override = default;
 
     Status init_reader();
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
+    Status get_parsered_schema(std::vector<std::string>* col_names,
+                               std::vector<TypeDescriptor>* col_types) 
override;
 
 private:
     Status _get_range_params();
@@ -107,8 +112,6 @@ private:
     std::unique_ptr<LineReader> _line_reader;
     bool _reader_eof;
 
-    TFileFormatType::type _file_format_type;
-
     // When we fetch range doesn't start from 0 will always skip the first line
     bool _skip_first_line;
 
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 77375831f6..0cd19bdae6 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -81,6 +81,14 @@ OrcReader::OrcReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params
     _init_profile();
 }
 
+OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
+                     const std::vector<std::string>& column_names, const 
std::string& ctz)
+        : _profile(nullptr),
+          _scan_params(params),
+          _scan_range(range),
+          _ctz(ctz),
+          _column_names(column_names) {}
+
 OrcReader::~OrcReader() {
     close();
 }
@@ -166,6 +174,40 @@ Status OrcReader::init_reader(
     return Status::OK();
 }
 
+Status OrcReader::get_parsered_schema(std::vector<std::string>* col_names,
+                                      std::vector<TypeDescriptor>* col_types) {
+    if (_file_reader == nullptr) {
+        std::unique_ptr<FileReader> inner_reader;
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range.path,
+                                                        
_scan_range.start_offset,
+                                                        _scan_range.file_size, 
0, inner_reader));
+        RETURN_IF_ERROR(inner_reader->open());
+        _file_reader = new ORCFileInputStream(_scan_range.path, 
inner_reader.release());
+    }
+    if (_file_reader->getLength() == 0) {
+        return Status::EndOfFile("Empty orc file");
+    }
+
+    // create orc reader
+    try {
+        orc::ReaderOptions options;
+        _reader = 
orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options);
+    } catch (std::exception& e) {
+        return Status::InternalError("Init OrcReader failed. reason = {}", 
e.what());
+    }
+
+    if (_reader->getNumberOfRows() == 0) {
+        return Status::EndOfFile("Empty orc file");
+    }
+
+    auto& root_type = _reader->getType();
+    for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
+        col_names->emplace_back(root_type.getFieldName(i));
+        
col_types->emplace_back(_convert_to_doris_type(root_type.getSubtype(i)));
+    }
+    return Status::OK();
+}
+
 Status OrcReader::_init_read_columns() {
     auto& root_type = _reader->getType();
     std::unordered_set<std::string> orc_cols;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index c9b1d2dc45..9b0e4b6a44 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -77,6 +77,9 @@ public:
               const TFileRangeDesc& range, const std::vector<std::string>& 
column_names,
               size_t batch_size, const std::string& ctz);
 
+    OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
+              const std::vector<std::string>& column_names, const std::string& 
ctz);
+
     ~OrcReader() override;
     // for test
     void set_file_reader(const std::string& file_name, FileReader* 
file_reader) {
@@ -96,6 +99,9 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
+    Status get_parsered_schema(std::vector<std::string>* col_names,
+                               std::vector<TypeDescriptor>* col_types) 
override;
+
 private:
     struct OrcProfile {
         RuntimeProfile::Counter* read_time;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index af899fb318..bfbccbcb7c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -40,6 +40,13 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams
     _init_profile();
 }
 
+ParquetReader::ParquetReader(const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
+                             const std::vector<std::string>& column_names)
+        : _profile(nullptr),
+          _scan_params(params),
+          _scan_range(range),
+          _column_names(column_names) {}
+
 ParquetReader::~ParquetReader() {
     close();
 }
@@ -182,6 +189,34 @@ std::unordered_map<std::string, TypeDescriptor> 
ParquetReader::get_name_to_type(
     return map;
 }
 
+Status ParquetReader::get_parsered_schema(std::vector<std::string>* col_names,
+                                          std::vector<TypeDescriptor>* 
col_types) {
+    if (_file_reader == nullptr) {
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_scan_params, _scan_range.path,
+                                                        
_scan_range.start_offset,
+                                                        _scan_range.file_size, 
0, _file_reader));
+    }
+    RETURN_IF_ERROR(_file_reader->open());
+    if (_file_reader->size() == 0) {
+        return Status::EndOfFile("Empty Parquet File");
+    }
+    RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
+    _t_metadata = &_file_metadata->to_thrift();
+
+    _total_groups = _t_metadata->row_groups.size();
+    if (_total_groups == 0) {
+        return Status::EndOfFile("Empty Parquet File");
+    }
+
+    auto schema_desc = _file_metadata->schema();
+    for (int i = 0; i < schema_desc.size(); ++i) {
+        // Get the Column Reader for the boolean column
+        col_names->emplace_back(schema_desc.get_column(i)->name);
+        col_types->emplace_back(schema_desc.get_column(i)->type);
+    }
+    return Status::OK();
+}
+
 Status ParquetReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
                                   std::unordered_set<std::string>* 
missing_cols) {
     const auto& schema_desc = _file_metadata->schema();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 50a8505652..22a99df22f 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -54,6 +54,9 @@ public:
                   const TFileRangeDesc& range, const std::vector<std::string>& 
column_names,
                   size_t batch_size, cctz::time_zone* ctz);
 
+    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
+                  const std::vector<std::string>& column_names);
+
     ~ParquetReader() override;
     // for test
     void set_file_reader(FileReader* file_reader) { 
_file_reader.reset(file_reader); }
@@ -71,6 +74,9 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
+    Status get_parsered_schema(std::vector<std::string>* col_names,
+                               std::vector<TypeDescriptor>* col_types) 
override;
+
     Statistics& statistics() { return _statistics; }
 
 private:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 9fd59fc43d..a5e8a7aabc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -33,9 +33,11 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.external.hive.util.HiveUtil;
 import org.apache.doris.load.BrokerFileGroup;
 import 
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
+import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileScanRangeParams;
 import org.apache.doris.thrift.TFileScanSlotInfo;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
 import org.apache.doris.thrift.TFileType;
 
 import com.google.common.collect.Lists;
@@ -270,21 +272,16 @@ public class HiveScanProvider extends 
HMSTableScanProvider {
     }
 
     @Override
-    public String getColumnSeparator() throws UserException {
-        return hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
-                .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER);
+    public TFileAttributes getFileAttributes() throws UserException {
+        TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+        
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
+                .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
+        textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
+        TFileAttributes fileAttributes = new TFileAttributes();
+        fileAttributes.setTextParams(textParams);
+        fileAttributes.setHeaderType("");
+        return fileAttributes;
     }
-
-    @Override
-    public String getLineSeparator() {
-        return DEFAULT_LINE_DELIMITER;
-    }
-
-    @Override
-    public String getHeaderType() {
-        return "";
-    }
-
 }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 4b865560d7..eae1829603 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -29,7 +29,6 @@ import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileTextScanRangeParams;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.THdfsParams;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -52,11 +51,7 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
     private int inputSplitNum = 0;
     private long inputFileSize = 0;
 
-    public abstract String getColumnSeparator() throws UserException;
-
-    public abstract String getLineSeparator();
-
-    public abstract String getHeaderType();
+    public abstract TFileAttributes getFileAttributes() throws UserException;
 
     @Override
     public void createScanRangeLocations(ParamCreateContext context, 
BackendPolicy backendPolicy,
@@ -78,14 +73,8 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
             context.params.setFileType(locationType);
             TFileFormatType fileFormatType = getFileFormatType();
             context.params.setFormatType(getFileFormatType());
-            if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
-                TFileTextScanRangeParams textParams = new 
TFileTextScanRangeParams();
-                textParams.setColumnSeparator(getColumnSeparator());
-                textParams.setLineDelimiter(getLineSeparator());
-                TFileAttributes fileAttributes = new TFileAttributes();
-                fileAttributes.setTextParams(textParams);
-                fileAttributes.setHeaderType(getHeaderType());
-                context.params.setFileAttributes(fileAttributes);
+            if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || 
fileFormatType == TFileFormatType.FORMAT_JSON) {
+                context.params.setFileAttributes(getFileAttributes());
             }
 
             // set hdfs params for hdfs file type.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
index 88627cef71..8c8bdf9d30 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
@@ -31,6 +31,7 @@ import org.apache.doris.load.BrokerFileGroup;
 import 
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
 import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
 import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileScanRangeParams;
 import org.apache.doris.thrift.TFileScanSlotInfo;
@@ -59,18 +60,8 @@ public class TVFScanProvider extends QueryScanProvider {
 
     // =========== implement abstract methods of QueryScanProvider 
=================
     @Override
-    public String getColumnSeparator() throws UserException {
-        return tableValuedFunction.getColumnSeparator();
-    }
-
-    @Override
-    public String getLineSeparator() {
-        return tableValuedFunction.getLineSeparator();
-    }
-
-    @Override
-    public String getHeaderType() {
-        return tableValuedFunction.getHeaderType();
+    public TFileAttributes getFileAttributes() throws UserException {
+        return tableValuedFunction.getFileAttributes();
     }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index d6b6af0ce0..ac69cad8de 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.planner.PlanNodeId;
@@ -65,8 +66,15 @@ import java.util.concurrent.Future;
  */
 public abstract class ExternalFileTableValuedFunction extends 
TableValuedFunctionIf {
     public static final Logger LOG = 
LogManager.getLogger(ExternalFileTableValuedFunction.class);
-    public static final String DEFAULT_COLUMN_SEPARATOR = ",";
-    public static final String DEFAULT_LINE_DELIMITER = "\n";
+    protected static final String DEFAULT_COLUMN_SEPARATOR = ",";
+    protected static final String DEFAULT_LINE_DELIMITER = "\n";
+    protected static final String FORMAT = "format";
+    protected static final String COLUMN_SEPARATOR = "column_separator";
+    protected static final String LINE_DELIMITER = "line_delimiter";
+    protected static final String JSON_ROOT = "json_root";
+    protected static final String JSON_PATHS = "jsonpaths";
+    protected static final String STRIP_OUTER_ARRAY = "strip_outer_array";
+    protected static final String READ_JSON_BY_LINE = "read_json_by_line";
 
     protected List<Column> columns = null;
     protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
@@ -77,6 +85,11 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
 
     protected String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
     protected String lineDelimiter = DEFAULT_LINE_DELIMITER;
+    protected String jsonRoot = "";
+    protected String jsonPaths = "";
+    protected String stripOuterArray = "";
+    protected String readJsonByLine = "";
+
 
     public abstract TFileType getTFileType();
 
@@ -92,28 +105,77 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         return locationProperties;
     }
 
-    public String getColumnSeparator() {
-        return columnSeparator;
-    }
-
-    public String getLineSeparator() {
-        return lineDelimiter;
-    }
-
-    public String getHeaderType() {
-        return headerType;
-    }
-
-    public void parseFile() throws UserException {
+    protected void parseFile() throws UserException {
         String path = getFilePath();
         BrokerDesc brokerDesc = getBrokerDesc();
         BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
     }
 
+    protected void parseProperties(Map<String, String> validParams) throws 
UserException {
+        String formatString = validParams.getOrDefault(FORMAT, 
"").toLowerCase();
+        switch (formatString) {
+            case "csv":
+                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
+                break;
+            case "csv_with_names":
+                this.headerType = FeConstants.csv_with_names;
+                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
+                break;
+            case "csv_with_names_and_types":
+                this.headerType = FeConstants.csv_with_names_and_types;
+                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
+                break;
+            case "parquet":
+                this.fileFormatType = TFileFormatType.FORMAT_PARQUET;
+                break;
+            case "orc":
+                this.fileFormatType = TFileFormatType.FORMAT_ORC;
+                break;
+            case "json":
+                this.fileFormatType = TFileFormatType.FORMAT_JSON;
+                break;
+            default:
+                throw new AnalysisException("format:" + formatString + " is 
not supported.");
+        }
+
+        columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR, 
DEFAULT_COLUMN_SEPARATOR);
+        lineDelimiter = validParams.getOrDefault(LINE_DELIMITER, 
DEFAULT_LINE_DELIMITER);
+        jsonRoot = validParams.getOrDefault(JSON_ROOT, "");
+        jsonPaths = validParams.getOrDefault(JSON_PATHS, "");
+        stripOuterArray = validParams.getOrDefault(STRIP_OUTER_ARRAY, 
"false").toLowerCase();
+        readJsonByLine = validParams.getOrDefault(READ_JSON_BY_LINE, 
"true").toLowerCase();
+    }
+
     public List<TBrokerFileStatus> getFileStatuses() {
         return fileStatuses;
     }
 
+    public TFileAttributes getFileAttributes() {
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
+        fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
+        fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
+        fileAttributes.setTextParams(fileTextScanRangeParams);
+        if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
+            fileAttributes.setHeaderType(this.headerType);
+        } else if (this.fileFormatType == TFileFormatType.FORMAT_JSON) {
+            fileAttributes.setJsonRoot(jsonRoot);
+            fileAttributes.setJsonpaths(jsonPaths);
+            if (readJsonByLine.equalsIgnoreCase("true")) {
+                fileAttributes.setReadJsonByLine(true);
+            } else {
+                fileAttributes.setReadJsonByLine(false);
+            }
+            if (stripOuterArray.equalsIgnoreCase("true")) {
+                fileAttributes.setStripOuterArray(true);
+            } else {
+                fileAttributes.setStripOuterArray(false);
+            }
+            // TODO(ftw): num_as_string/fuzzy_parser?
+        }
+        return fileAttributes;
+    }
+
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
         return new ExternalFileScanNode(id, desc);
@@ -218,16 +280,4 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         return InternalService.PFetchTableSchemaRequest.newBuilder()
                 .setFileScanRange(ByteString.copyFrom(new 
TSerializer().serialize(fileScanRange))).build();
     }
-
-    private TFileAttributes getFileAttributes() {
-        TFileAttributes fileAttributes = new TFileAttributes();
-        if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
-            TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
-            fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
-            fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
-            fileAttributes.setTextParams(fileTextScanRangeParams);
-            fileAttributes.setHeaderType(this.headerType);
-        }
-        return fileAttributes;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 1a615e787c..784a75accc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -20,10 +20,8 @@ package org.apache.doris.tablefunction;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.StorageBackend.StorageType;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.S3URI;
-import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 
 import com.google.common.collect.ImmutableSet;
@@ -40,14 +38,13 @@ import java.util.Map;
 public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
     public static final Logger LOG = 
LogManager.getLogger(S3TableValuedFunction.class);
     public static final String NAME = "s3";
-    public static final String S3_URI = "URI";
+    public static final String S3_URI = "uri";
     public static final String S3_AK = "AWS_ACCESS_KEY";
     public static final String S3_SK = "AWS_SECRET_KEY";
     public static final String S3_ENDPOINT = "AWS_ENDPOINT";
     public static final String S3_REGION = "AWS_REGION";
-    public static final String FORMAT = "FORMAT";
-    private static final String AK = "ACCESS_KEY";
-    private static final String SK = "SECRET_KEY";
+    private static final String AK = "access_key";
+    private static final String SK = "secret_key";
 
     public static final String USE_PATH_STYLE = "use_path_style";
 
@@ -56,6 +53,10 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
                         .add(AK)
                         .add(SK)
                         .add(FORMAT)
+                        .add(JSON_ROOT)
+                        .add(JSON_PATHS)
+                        .add(STRIP_OUTER_ARRAY)
+                        .add(READ_JSON_BY_LINE)
                         .build();
     private S3URI s3uri;
     private String s3AK;
@@ -64,31 +65,17 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
     public S3TableValuedFunction(Map<String, String> params) throws 
UserException {
         Map<String, String> validParams = Maps.newHashMap();
         for (String key : params.keySet()) {
-            if (!PROPERTIES_SET.contains(key.toUpperCase())) {
+            if (!PROPERTIES_SET.contains(key.toLowerCase())) {
                 throw new AnalysisException(key + " is invalid property");
             }
-            validParams.put(key.toUpperCase(), params.get(key));
+            validParams.put(key.toLowerCase(), params.get(key));
         }
 
         s3uri = S3URI.create(validParams.get(S3_URI));
         s3AK = validParams.getOrDefault(AK, "");
         s3SK = validParams.getOrDefault(SK, "");
-        String formatString = validParams.getOrDefault(FORMAT, "");
-        switch (formatString.toLowerCase()) {
-            case "csv":
-                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            case "csv_with_names":
-                this.headerType = FeConstants.csv_with_names;
-                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            case "csv_with_names_and_types":
-                this.headerType = FeConstants.csv_with_names_and_types;
-                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            default:
-                throw new AnalysisException("format:" + formatString + " is 
not supported.");
-        }
+
+        parseProperties(validParams);
 
         // set S3 location properties
         locationProperties = Maps.newHashMap();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to