This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b3e4c75410bbd28c39a430331049f61f4e5bd6ba
Author: slothever <[email protected]>
AuthorDate: Sat Aug 26 00:16:00 2023 +0800

    [fix](multi-catalog)fix hive table with cosn location issue (#23409)
    
    Sometimes, the partitions of a hive table may on different storage, eg, 
some is on HDFS, others on object storage(cos, etc).
    This PR mainly changes:
    
    1. Fix the bug of accessing files via cosn.
    2. Add a new field `fs_name` in TFileRangeDesc
        This is because, when accessing a file, the BE will get a hdfs client 
from hdfs client cache, and different file in one query
    request may have different fs name, eg, some of are `hdfs://`, some of are 
`cosn://`, so we need to specify fs name
    for each file, otherwise, it may return error:
    
    `reason: IllegalArgumentException: Wrong FS: 
cosn://doris-build-1308700295/xxxx, expected: 
hdfs://[172.xxxx:4007](http://172.xxxxx:4007/)`
---
 be/src/io/file_factory.cpp                         |  4 +-
 be/src/io/fs/benchmark/hdfs_benchmark.hpp          |  6 +--
 be/src/io/fs/fs_utils.h                            |  4 ++
 be/src/io/fs/hdfs_file_reader.cpp                  |  5 +-
 be/src/io/fs/hdfs_file_system.cpp                  | 62 +++++++++++++---------
 be/src/io/fs/hdfs_file_system.h                    |  2 +-
 be/src/io/fs/hdfs_file_writer.cpp                  | 12 ++---
 be/src/io/hdfs_builder.cpp                         | 11 ++--
 be/src/io/hdfs_builder.h                           | 13 ++---
 be/src/runtime/snapshot_loader.cpp                 |  2 +-
 be/src/util/hdfs_util.cpp                          |  4 ++
 be/src/util/hdfs_util.h                            |  5 ++
 be/src/vec/exec/format/csv/csv_reader.cpp          |  3 ++
 be/src/vec/exec/format/json/new_json_reader.cpp    |  4 ++
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  3 ++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  3 ++
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  2 +
 .../format/table/transactional_hive_reader.cpp     |  2 +
 be/src/vec/runtime/vfile_result_writer.cpp         |  3 +-
 fe/be-java-extensions/preload-extensions/pom.xml   |  6 +++
 .../doris/planner/external/FileQueryScanNode.java  | 14 ++---
 .../doris/planner/external/HiveScanNode.java       |  2 +-
 .../planner/external/iceberg/IcebergScanNode.java  |  2 +-
 .../planner/external/paimon/PaimonScanNode.java    |  2 +-
 gensrc/thrift/PlanNodes.thrift                     |  3 ++
 .../hive/test_mixed_par_locations.groovy           |  9 ++--
 26 files changed, 118 insertions(+), 70 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index d46d2c5b4c..42d55d5fc0 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
     case TFileType::FILE_HDFS: {
         THdfsParams hdfs_params = parse_properties(properties);
         std::shared_ptr<io::HdfsFileSystem> fs;
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, 
hdfs_params.fs_name, nullptr, &fs));
         RETURN_IF_ERROR(fs->create_file(path, &file_writer));
         break;
     }
@@ -181,7 +181,7 @@ Status FileFactory::create_hdfs_reader(const THdfsParams& 
hdfs_params,
                                        std::shared_ptr<io::FileSystem>* 
hdfs_file_system,
                                        io::FileReaderSPtr* reader, 
RuntimeProfile* profile) {
     std::shared_ptr<io::HdfsFileSystem> fs;
-    RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs));
+    RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, fd.fs_name, 
profile, &fs));
     RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader));
     *hdfs_file_system = std::move(fs);
     return Status::OK();
diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp 
b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
index 2a6f97d5e8..8a5b6f240f 100644
--- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp
+++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
@@ -96,7 +96,7 @@ public:
         std::shared_ptr<io::HdfsFileSystem> fs;
         io::FileWriterPtr writer;
         THdfsParams hdfs_params = parse_properties(_conf_map);
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, 
hdfs_params.fs_name, nullptr, &fs));
         RETURN_IF_ERROR(fs->create_file(file_path, &writer));
         return write(state, writer.get());
     }
@@ -117,7 +117,7 @@ public:
         auto new_file_path = file_path + "_new";
         THdfsParams hdfs_params = parse_properties(_conf_map);
         std::shared_ptr<io::HdfsFileSystem> fs;
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, 
hdfs_params.fs_name, nullptr, &fs));
 
         auto start = std::chrono::high_resolution_clock::now();
         RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
@@ -144,7 +144,7 @@ public:
 
         std::shared_ptr<io::HdfsFileSystem> fs;
         THdfsParams hdfs_params = parse_properties(_conf_map);
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, 
hdfs_params.fs_name, nullptr, &fs));
 
         auto start = std::chrono::high_resolution_clock::now();
         bool res = false;
diff --git a/be/src/io/fs/fs_utils.h b/be/src/io/fs/fs_utils.h
index 64ad2c6ea8..2befc58a3e 100644
--- a/be/src/io/fs/fs_utils.h
+++ b/be/src/io/fs/fs_utils.h
@@ -46,6 +46,10 @@ struct FileDescription {
     // modification time of this file.
     // 0 means unset.
     int64_t mtime = 0;
+    // for hdfs, eg: hdfs://nameservices1/
+    // because for a hive table, differenet partitions may have different
+    // locations(or fs), so different files may have different fs.
+    std::string fs_name;
 };
 
 } // namespace io
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index 0f999dfc98..6c4f456e37 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -31,6 +31,7 @@
 // #include "io/fs/hdfs_file_system.h"
 #include "service/backend_options.h"
 #include "util/doris_metrics.h"
+#include "util/hdfs_util.h"
 
 namespace doris {
 namespace io {
@@ -45,7 +46,7 @@ HdfsFileReader::HdfsFileReader(Path path, const std::string& 
name_node,
 
     DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
     DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
-    if (_profile != nullptr) {
+    if (_profile != nullptr && is_hdfs(_name_node)) {
 #ifdef USE_HADOOP_HDFS
         const char* hdfs_profile_name = "HdfsIO";
         ADD_TIMER(_profile, hdfs_profile_name);
@@ -76,7 +77,7 @@ Status HdfsFileReader::close() {
     bool expected = false;
     if (_closed.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
         DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
-        if (_profile != nullptr) {
+        if (_profile != nullptr && is_hdfs(_name_node)) {
 #ifdef USE_HADOOP_HDFS
             struct hdfsReadStatistics* hdfs_statistics = nullptr;
             auto r = hdfsFileGetReadStatistics(_handle->file(), 
&hdfs_statistics);
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index 4473ea2a3a..9cca7fdad0 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -69,7 +69,8 @@ public:
     const HdfsFileSystemCache& operator=(const HdfsFileSystemCache&) = delete;
 
     // This function is thread-safe
-    Status get_connection(const THdfsParams& hdfs_params, 
HdfsFileSystemHandle** fs_handle);
+    Status get_connection(const THdfsParams& hdfs_params, const std::string& 
fs_name,
+                          HdfsFileSystemHandle** fs_handle);
 
 private:
     std::mutex _lock;
@@ -77,8 +78,9 @@ private:
 
     HdfsFileSystemCache() = default;
 
-    uint64 _hdfs_hash_code(const THdfsParams& hdfs_params);
-    Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, bool* 
is_kerberos);
+    uint64 _hdfs_hash_code(const THdfsParams& hdfs_params, const std::string& 
fs_name);
+    Status _create_fs(const THdfsParams& hdfs_params, const std::string& 
fs_name, hdfsFS* fs,
+                      bool* is_kerberos);
     void _clean_invalid();
     void _clean_oldest();
 };
@@ -117,7 +119,7 @@ Status HdfsFileHandleCache::get_file(const 
std::shared_ptr<HdfsFileSystem>& fs,
     return Status::OK();
 }
 
-Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const 
std::string& path,
+Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const 
std::string& fs_name,
                               RuntimeProfile* profile, 
std::shared_ptr<HdfsFileSystem>* fs) {
 #ifdef USE_HADOOP_HDFS
     if (!config::enable_java_support) {
@@ -126,17 +128,21 @@ Status HdfsFileSystem::create(const THdfsParams& 
hdfs_params, const std::string&
                 "true.");
     }
 #endif
-    (*fs).reset(new HdfsFileSystem(hdfs_params, path, profile));
+    (*fs).reset(new HdfsFileSystem(hdfs_params, fs_name, profile));
     return (*fs)->connect();
 }
 
-HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const 
std::string& path,
+HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const 
std::string& fs_name,
                                RuntimeProfile* profile)
-        : RemoteFileSystem(path, "", FileSystemType::HDFS),
+        : RemoteFileSystem("", "", FileSystemType::HDFS),
           _hdfs_params(hdfs_params),
           _fs_handle(nullptr),
           _profile(profile) {
-    _namenode = _hdfs_params.fs_name;
+    if (_hdfs_params.__isset.fs_name) {
+        _fs_name = _hdfs_params.fs_name;
+    } else {
+        _fs_name = fs_name;
+    }
 }
 
 HdfsFileSystem::~HdfsFileSystem() {
@@ -150,7 +156,8 @@ HdfsFileSystem::~HdfsFileSystem() {
 }
 
 Status HdfsFileSystem::connect_impl() {
-    
RETURN_IF_ERROR(HdfsFileSystemCache::instance()->get_connection(_hdfs_params, 
&_fs_handle));
+    RETURN_IF_ERROR(
+            HdfsFileSystemCache::instance()->get_connection(_hdfs_params, 
_fs_name, &_fs_handle));
     if (!_fs_handle) {
         return Status::IOError("failed to init Hdfs handle with, please check 
hdfs params.");
     }
@@ -165,20 +172,20 @@ Status HdfsFileSystem::create_file_impl(const Path& file, 
FileWriterPtr* writer)
 Status HdfsFileSystem::open_file_internal(const FileDescription& fd, const 
Path& abs_path,
                                           FileReaderSPtr* reader) {
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = convert_path(abs_path, _namenode);
+    Path real_path = convert_path(abs_path, _fs_name);
 
     FileHandleCache::Accessor accessor;
     RETURN_IF_ERROR(HdfsFileHandleCache::instance()->get_file(
             std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), 
real_path, fd.mtime,
             fd.file_size, &accessor));
 
-    *reader = std::make_shared<HdfsFileReader>(abs_path, _namenode, 
std::move(accessor), _profile);
+    *reader = std::make_shared<HdfsFileReader>(abs_path, _fs_name, 
std::move(accessor), _profile);
     return Status::OK();
 }
 
 Status HdfsFileSystem::create_directory_impl(const Path& dir, bool 
failed_if_exists) {
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = convert_path(dir, _namenode);
+    Path real_path = convert_path(dir, _fs_name);
     int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, 
real_path.string().c_str());
     if (res == -1) {
         return Status::IOError("failed to create directory {}: {}", 
dir.native(), hdfs_error());
@@ -208,7 +215,7 @@ Status HdfsFileSystem::delete_internal(const Path& path, 
int is_recursive) {
         return Status::OK();
     }
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = convert_path(path, _namenode);
+    Path real_path = convert_path(path, _fs_name);
     int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(), 
is_recursive);
     if (res == -1) {
         return Status::IOError("failed to delete directory {}: {}", 
path.native(), hdfs_error());
@@ -218,7 +225,7 @@ Status HdfsFileSystem::delete_internal(const Path& path, 
int is_recursive) {
 
 Status HdfsFileSystem::exists_impl(const Path& path, bool* res) const {
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = convert_path(path, _namenode);
+    Path real_path = convert_path(path, _fs_name);
     int is_exists = hdfsExists(_fs_handle->hdfs_fs, 
real_path.string().c_str());
 #ifdef USE_HADOOP_HDFS
     // when calling hdfsExists() and return non-zero code,
@@ -236,7 +243,7 @@ Status HdfsFileSystem::exists_impl(const Path& path, bool* 
res) const {
 
 Status HdfsFileSystem::file_size_impl(const Path& path, int64_t* file_size) 
const {
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = convert_path(path, _namenode);
+    Path real_path = convert_path(path, _fs_name);
     hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs, 
real_path.string().c_str());
     if (file_info == nullptr) {
         return Status::IOError("failed to get file size of {}: {}", 
path.native(), hdfs_error());
@@ -254,7 +261,7 @@ Status HdfsFileSystem::list_impl(const Path& path, bool 
only_file, std::vector<F
     }
 
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = convert_path(path, _namenode);
+    Path real_path = convert_path(path, _fs_name);
     int numEntries = 0;
     hdfsFileInfo* hdfs_file_info =
             hdfsListDirectory(_fs_handle->hdfs_fs, real_path.c_str(), 
&numEntries);
@@ -278,8 +285,8 @@ Status HdfsFileSystem::list_impl(const Path& path, bool 
only_file, std::vector<F
 }
 
 Status HdfsFileSystem::rename_impl(const Path& orig_name, const Path& 
new_name) {
-    Path normal_orig_name = convert_path(orig_name, _namenode);
-    Path normal_new_name = convert_path(new_name, _namenode);
+    Path normal_orig_name = convert_path(orig_name, _fs_name);
+    Path normal_new_name = convert_path(new_name, _fs_name);
     int ret = hdfsRename(_fs_handle->hdfs_fs, normal_orig_name.c_str(), 
normal_new_name.c_str());
     if (ret == 0) {
         LOG(INFO) << "finished to rename file. orig: " << normal_orig_name
@@ -424,15 +431,14 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
 // ************* HdfsFileSystemCache ******************
 int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
 
-Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* 
fs,
-                                       bool* is_kerberos) {
+Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, const 
std::string& fs_name,
+                                       hdfsFS* fs, bool* is_kerberos) {
     HDFSCommonBuilder builder;
-    RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder));
+    RETURN_IF_ERROR(create_hdfs_builder(hdfs_params, fs_name, &builder));
     *is_kerberos = builder.is_need_kinit();
     hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
     if (hdfs_fs == nullptr) {
-        return Status::IOError("faield to connect to hdfs {}: {}", 
hdfs_params.fs_name,
-                               hdfs_error());
+        return Status::IOError("faield to connect to hdfs {}: {}", fs_name, 
hdfs_error());
     }
     *fs = hdfs_fs;
     return Status::OK();
@@ -463,8 +469,9 @@ void HdfsFileSystemCache::_clean_oldest() {
 }
 
 Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params,
+                                           const std::string& fs_name,
                                            HdfsFileSystemHandle** fs_handle) {
-    uint64 hash_code = _hdfs_hash_code(hdfs_params);
+    uint64 hash_code = _hdfs_hash_code(hdfs_params, fs_name);
     {
         std::lock_guard<std::mutex> l(_lock);
         auto it = _cache.find(hash_code);
@@ -484,7 +491,7 @@ Status HdfsFileSystemCache::get_connection(const 
THdfsParams& hdfs_params,
         // create a new one and try to put it into cache
         hdfsFS hdfs_fs = nullptr;
         bool is_kerberos = false;
-        RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs, &is_kerberos));
+        RETURN_IF_ERROR(_create_fs(hdfs_params, fs_name, &hdfs_fs, 
&is_kerberos));
         if (_cache.size() >= MAX_CACHE_HANDLE) {
             _clean_invalid();
             _clean_oldest();
@@ -502,10 +509,13 @@ Status HdfsFileSystemCache::get_connection(const 
THdfsParams& hdfs_params,
     return Status::OK();
 }
 
-uint64 HdfsFileSystemCache::_hdfs_hash_code(const THdfsParams& hdfs_params) {
+uint64 HdfsFileSystemCache::_hdfs_hash_code(const THdfsParams& hdfs_params,
+                                            const std::string& fs_name) {
     uint64 hash_code = 0;
     if (hdfs_params.__isset.fs_name) {
         hash_code += Fingerprint(hdfs_params.fs_name);
+    } else {
+        hash_code += Fingerprint(fs_name);
     }
     if (hdfs_params.__isset.user) {
         hash_code += Fingerprint(hdfs_params.user);
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index f365891aa9..229d236195 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -153,7 +153,7 @@ private:
     HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
                    RuntimeProfile* profile);
     const THdfsParams& _hdfs_params;
-    std::string _namenode;
+    std::string _fs_name;
     // do not use std::shared_ptr or std::unique_ptr
     // _fs_handle is managed by HdfsFileSystemCache
     HdfsFileSystemHandle* _fs_handle;
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index 5eaaa8a404..936defb2f6 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -58,7 +58,7 @@ Status HdfsFileWriter::close() {
         std::stringstream ss;
         ss << "failed to flush hdfs file. "
            << "(BE: " << BackendOptions::get_localhost() << ")"
-           << "namenode:" << _hdfs_fs->_namenode << " path:" << _path << ", 
err: " << hdfs_error();
+           << "namenode:" << _hdfs_fs->_fs_name << " path:" << _path << ", 
err: " << hdfs_error();
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
@@ -88,7 +88,7 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                     hdfsWrite(_hdfs_fs->_fs_handle->hdfs_fs, _hdfs_file, p, 
left_bytes);
             if (written_bytes < 0) {
                 return Status::InternalError("write hdfs failed. namenode: {}, 
path: {}, error: {}",
-                                             _hdfs_fs->_namenode, 
_path.native(), hdfs_error());
+                                             _hdfs_fs->_fs_name, 
_path.native(), hdfs_error());
             }
             left_bytes -= written_bytes;
             p += written_bytes;
@@ -109,7 +109,7 @@ Status HdfsFileWriter::finalize() {
 }
 
 Status HdfsFileWriter::_open() {
-    _path = convert_path(_path, _hdfs_fs->_namenode);
+    _path = convert_path(_path, _hdfs_fs->_fs_name);
     std::string hdfs_dir = _path.parent_path().string();
     int exists = hdfsExists(_hdfs_fs->_fs_handle->hdfs_fs, hdfs_dir.c_str());
     if (exists != 0) {
@@ -119,7 +119,7 @@ Status HdfsFileWriter::_open() {
             std::stringstream ss;
             ss << "create dir failed. "
                << "(BE: " << BackendOptions::get_localhost() << ")"
-               << " namenode: " << _hdfs_fs->_namenode << " path: " << hdfs_dir
+               << " namenode: " << _hdfs_fs->_fs_name << " path: " << hdfs_dir
                << ", err: " << hdfs_error();
             LOG(WARNING) << ss.str();
             return Status::InternalError(ss.str());
@@ -131,11 +131,11 @@ Status HdfsFileWriter::_open() {
         std::stringstream ss;
         ss << "open file failed. "
            << "(BE: " << BackendOptions::get_localhost() << ")"
-           << " namenode:" << _hdfs_fs->_namenode << " path:" << _path << ", 
err: " << hdfs_error();
+           << " namenode:" << _hdfs_fs->_fs_name << " path:" << _path << ", 
err: " << hdfs_error();
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
-    VLOG_NOTICE << "open file. namenode:" << _hdfs_fs->_namenode << ", path:" 
<< _path;
+    VLOG_NOTICE << "open file. namenode:" << _hdfs_fs->_fs_name << ", path:" 
<< _path;
     return Status::OK();
 }
 
diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp
index b420c84e13..754eb61680 100644
--- a/be/src/io/hdfs_builder.cpp
+++ b/be/src/io/hdfs_builder.cpp
@@ -109,9 +109,10 @@ THdfsParams parse_properties(const std::map<std::string, 
std::string>& propertie
     return hdfsParams;
 }
 
-Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* 
builder) {
+Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& 
fs_name,
+                           HDFSCommonBuilder* builder) {
     RETURN_IF_ERROR(builder->init_hdfs_builder());
-    hdfsBuilderSetNameNode(builder->get(), hdfsParams.fs_name.c_str());
+    hdfsBuilderSetNameNode(builder->get(), fs_name.c_str());
     // set kerberos conf
     if (hdfsParams.__isset.hdfs_kerberos_principal) {
         builder->need_kinit = true;
@@ -165,10 +166,10 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams, 
HDFSCommonBuilder* build
     return Status::OK();
 }
 
-Status createHDFSBuilder(const std::map<std::string, std::string>& properties,
-                         HDFSCommonBuilder* builder) {
+Status create_hdfs_builder(const std::map<std::string, std::string>& 
properties,
+                           HDFSCommonBuilder* builder) {
     THdfsParams hdfsParams = parse_properties(properties);
-    return createHDFSBuilder(hdfsParams, builder);
+    return create_hdfs_builder(hdfsParams, hdfsParams.fs_name, builder);
 }
 
 } // namespace doris
diff --git a/be/src/io/hdfs_builder.h b/be/src/io/hdfs_builder.h
index d3f7c35017..159544fc58 100644
--- a/be/src/io/hdfs_builder.h
+++ b/be/src/io/hdfs_builder.h
@@ -36,9 +36,10 @@ const std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
 const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_";
 
 class HDFSCommonBuilder {
-    friend Status createHDFSBuilder(const THdfsParams& hdfsParams, 
HDFSCommonBuilder* builder);
-    friend Status createHDFSBuilder(const std::map<std::string, std::string>& 
properties,
-                                    HDFSCommonBuilder* builder);
+    friend Status create_hdfs_builder(const THdfsParams& hdfsParams, const 
std::string& fs_name,
+                                      HDFSCommonBuilder* builder);
+    friend Status create_hdfs_builder(const std::map<std::string, 
std::string>& properties,
+                                      HDFSCommonBuilder* builder);
 
 public:
     HDFSCommonBuilder() {}
@@ -67,8 +68,8 @@ private:
 
 THdfsParams parse_properties(const std::map<std::string, std::string>& 
properties);
 
-Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* 
builder);
-Status createHDFSBuilder(const std::map<std::string, std::string>& properties,
-                         HDFSCommonBuilder* builder);
+Status create_hdfs_builder(const THdfsParams& hdfsParams, HDFSCommonBuilder* 
builder);
+Status create_hdfs_builder(const std::map<std::string, std::string>& 
properties,
+                           HDFSCommonBuilder* builder);
 
 } // namespace doris
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index 3ff8229bc3..d52afb0b85 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, 
const std::string& l
     } else if (TStorageBackendType::type::HDFS == type) {
         THdfsParams hdfs_params = parse_properties(_prop);
         std::shared_ptr<io::HdfsFileSystem> fs;
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&fs));
+        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, 
hdfs_params.fs_name, nullptr, &fs));
         _remote_fs = std::move(fs);
     } else if (TStorageBackendType::type::BROKER == type) {
         std::shared_ptr<io::BrokerFileSystem> fs;
diff --git a/be/src/util/hdfs_util.cpp b/be/src/util/hdfs_util.cpp
index 643053292e..f82b51cb27 100644
--- a/be/src/util/hdfs_util.cpp
+++ b/be/src/util/hdfs_util.cpp
@@ -50,5 +50,9 @@ Path convert_path(const Path& path, const std::string& 
namenode) {
     return real_path;
 }
 
+bool is_hdfs(const std::string& path_or_fs) {
+    return path_or_fs.rfind("hdfs://") == 0;
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/util/hdfs_util.h b/be/src/util/hdfs_util.h
index 39dc8bc416..0e10cc578b 100644
--- a/be/src/util/hdfs_util.h
+++ b/be/src/util/hdfs_util.h
@@ -43,5 +43,10 @@ private:
 // path like hdfs://ip:port/path can't be used by libhdfs3.
 Path convert_path(const Path& path, const std::string& namenode);
 
+std::string get_fs_name(const std::string& path);
+
+// return true if path_or_fs contains "hdfs://"
+bool is_hdfs(const std::string& path_or_fs);
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 10b5ced968..6def26def0 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -233,6 +233,9 @@ void CsvReader::_init_file_description() {
     _file_description.path = _range.path;
     _file_description.start_offset = _range.start_offset;
     _file_description.file_size = _range.__isset.file_size ? _range.file_size 
: 0;
+    if (_range.__isset.fs_name) {
+        _file_description.fs_name = _range.fs_name;
+    }
 }
 
 Status CsvReader::init_reader(bool is_load) {
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index b02c30807d..5dbd8fa8bc 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -154,6 +154,10 @@ void NewJsonReader::_init_file_description() {
     _file_description.path = _range.path;
     _file_description.start_offset = _range.start_offset;
     _file_description.file_size = _range.__isset.file_size ? _range.file_size 
: 0;
+
+    if (_range.__isset.fs_name) {
+        _file_description.fs_name = _range.fs_name;
+    }
 }
 
 Status NewJsonReader::init_reader(
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 6878cecfad..fe29180585 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -905,6 +905,9 @@ void OrcReader::_init_file_description() {
     _file_description.path = _scan_range.path;
     _file_description.start_offset = _scan_range.start_offset;
     _file_description.file_size = _scan_range.__isset.file_size ? 
_scan_range.file_size : 0;
+    if (_scan_range.__isset.fs_name) {
+        _file_description.fs_name = _scan_range.fs_name;
+    }
 }
 
 TypeDescriptor OrcReader::_convert_to_doris_type(const orc::Type* orc_type) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index f8dab141b7..e627a96e0d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -291,6 +291,9 @@ void ParquetReader::_init_file_description() {
     _file_description.path = _scan_range.path;
     _file_description.start_offset = _scan_range.start_offset;
     _file_description.file_size = _scan_range.__isset.file_size ? 
_scan_range.file_size : 0;
+    if (_scan_range.__isset.fs_name) {
+        _file_description.fs_name = _scan_range.fs_name;
+    }
 }
 
 Status ParquetReader::init_reader(
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 1b0f05cc95..95994723ae 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -261,6 +261,8 @@ Status IcebergTableReader::_position_delete(
         DeleteFile* delete_file_cache = _kv_cache->get<
                 DeleteFile>(_delet_file_cache_key(delete_file.path), [&]() -> 
DeleteFile* {
             TFileRangeDesc delete_range;
+            // must use __set() method to make sure __isset is true
+            delete_range.__set_fs_name(_range.fs_name);
             delete_range.path = delete_file.path;
             delete_range.start_offset = 0;
             delete_range.size = -1;
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp 
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index 79341f40aa..dbac1386d0 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -129,6 +129,8 @@ Status TransactionalHiveReader::init_row_filters(const 
TFileRangeDesc& range) {
         auto delete_file = fmt::format("{}/{}", 
delete_delta.directory_location, file_name);
 
         TFileRangeDesc delete_range;
+        // must use __set() method to make sure __isset is true
+        delete_range.__set_fs_name(_range.fs_name);
         delete_range.path = delete_file;
         delete_range.start_offset = 0;
         delete_range.size = -1;
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp 
b/be/src/vec/runtime/vfile_result_writer.cpp
index 0f5d5c416d..9d5fc4e158 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -599,7 +599,8 @@ Status VFileResultWriter::_delete_dir() {
     case TStorageBackendType::HDFS: {
         THdfsParams hdfs_params = 
parse_properties(_file_opts->broker_properties);
         std::shared_ptr<io::HdfsFileSystem> hdfs_fs = nullptr;
-        RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, 
&hdfs_fs));
+        RETURN_IF_ERROR(
+                io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, 
nullptr, &hdfs_fs));
         file_system = hdfs_fs;
         break;
     }
diff --git a/fe/be-java-extensions/preload-extensions/pom.xml 
b/fe/be-java-extensions/preload-extensions/pom.xml
index db8663212a..830ee1ea8f 100644
--- a/fe/be-java-extensions/preload-extensions/pom.xml
+++ b/fe/be-java-extensions/preload-extensions/pom.xml
@@ -214,6 +214,12 @@ under the License.
             <groupId>org.apache.doris</groupId>
             <artifactId>hive-catalog-shade</artifactId>
         </dependency>
+        <!-- For BE CosN Access -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-cos</artifactId>
+            <version>3.3.5</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index f5021c2608..40dba10c98 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -73,6 +73,7 @@ import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -358,9 +359,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             Map<String, String> locationProperties) throws UserException {
         if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
             if (!params.isSetHdfsParams()) {
-                String fsName = getFsName(fileSplit);
                 THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
-                tHdfsParams.setFsName(fsName);
+                // tHdfsParams.setFsName(getFsName(fileSplit));
                 params.setHdfsParams(tHdfsParams);
             }
 
@@ -413,14 +413,10 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
 
         rangeDesc.setFileType(locationType);
+        rangeDesc.setPath(fileSplit.getPath().toString());
         if (locationType == TFileType.FILE_HDFS) {
-            rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
-        } else if (locationType == TFileType.FILE_S3
-                || locationType == TFileType.FILE_BROKER
-                || locationType == TFileType.FILE_LOCAL
-                || locationType == TFileType.FILE_NET) {
-            // need full path
-            rangeDesc.setPath(fileSplit.getPath().toString());
+            URI fileUri = fileSplit.getPath().toUri();
+            rangeDesc.setFsName(fileUri.getScheme() + "://" + 
fileUri.getAuthority());
         }
         rangeDesc.setModificationTime(fileSplit.getModificationTime());
         return rangeDesc;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index d3e80ba4d9..1209be1cb9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -336,7 +336,7 @@ public class HiveScanNode extends FileQueryScanNode {
 
     @Override
     protected Map<String, String> getLocationProperties() throws UserException 
 {
-        return hmsTable.getCatalogProperties();
+        return hmsTable.getHadoopProperties();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 9dcd1716be..4837ba5545 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -362,7 +362,7 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     @Override
     public Map<String, String> getLocationProperties() throws UserException {
-        return source.getCatalog().getProperties();
+        return source.getCatalog().getCatalogProperty().getHadoopProperties();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index be3b04fb33..a68d5edcb2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -174,7 +174,7 @@ public class PaimonScanNode extends FileQueryScanNode {
 
     @Override
     public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
-        return source.getCatalog().getProperties();
+        return source.getCatalog().getCatalogProperty().getHadoopProperties();
     }
 
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 6be2a508f6..bb670c7063 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -403,6 +403,9 @@ struct TFileRangeDesc {
     9: optional i64 modification_time
     10: optional Types.TFileType file_type;
     11: optional TFileCompressType compress_type;
+    // for hive table, different files may have different fs,
+    // so fs_name should be with TFileRangeDesc
+    12: optional string fs_name
 }
 
 // TFileScanRange represents a set of descriptions of a file and the rules for 
reading and converting it.
diff --git 
a/regression-test/suites/external_table_p2/hive/test_mixed_par_locations.groovy 
b/regression-test/suites/external_table_p2/hive/test_mixed_par_locations.groovy
index ec092f99e7..ca8278bfdc 100644
--- 
a/regression-test/suites/external_table_p2/hive/test_mixed_par_locations.groovy
+++ 
b/regression-test/suites/external_table_p2/hive/test_mixed_par_locations.groovy
@@ -29,7 +29,7 @@ suite("test_mixed_par_locations", "p2") {
             String extHiveHmsPort = 
context.config.otherConfigs.get("extHiveHmsPort")
             String extAk = context.config.otherConfigs.get("extAk");
             String extSk = context.config.otherConfigs.get("extSk");
-            String ext3Endpoint = 
context.config.otherConfigs.get("ext3Endpoint");
+            String extS3Endpoint = 
context.config.otherConfigs.get("extS3Endpoint");
             String extS3Region = 
context.config.otherConfigs.get("extS3Region");
             String catalog_name = "test_mixed_par_locations"
 
@@ -38,10 +38,9 @@ suite("test_mixed_par_locations", "p2") {
                 create catalog if not exists ${catalog_name} properties (
                     'type'='hms',
                     'hive.metastore.uris' = 
'thrift://${extHiveHmsHost}:${extHiveHmsPort}',
-                    'AWS_ACCESS_KEY' = "${extAk}",
-                    'AWS_SECRET_KEY' = "${extSk}",
-                    'AWS_ENDPOINT' = "${ext3Endpoint}",
-                    'AWS_REGION' = "${extS3Region}"
+                    'cos.access_key' = '${extAk}',
+                    'cos.secret_key' = '${extSk}',
+                    'cos.endpoint' = '${extS3Endpoint}'
                 );
             """
             logger.info("catalog " + catalog_name + " created")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to