github-actions[bot] commented on code in PR #17586:
URL: https://github.com/apache/doris/pull/17586#discussion_r1131998703


##########
be/src/io/fs/file_system.h:
##########
@@ -21,58 +21,111 @@
 
 #include "common/status.h"
 #include "gutil/macros.h"
-#include "io/fs/file_reader.h"
 #include "io/fs/file_reader_options.h"
-#include "io/fs/file_writer.h"
 #include "io/fs/path.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "io/io_common.h"
 
 namespace doris {
 namespace io {
 
-class FileWriter;
-class FileReader;
-
 enum class FileSystemType : uint8_t {
     LOCAL,
     S3,
     HDFS,
     BROKER,
 };
 
+struct FileInfo {
+    // only file name, no path
+    std::string file_name;
+    size_t file_size;
+    bool is_file;
+};
+
 class FileSystem : public std::enable_shared_from_this<FileSystem> {
 public:
+    // The following are public interface.
+    // And derived classes should implement all xxx_impl methods.
+    Status create_file(const Path& file, FileWriterPtr* writer);
+    Status open_file(const Path& file, const FileReaderOptions& reader_options,
+                             FileReaderSPtr* reader, IOContext* io_ctx);
+    Status create_directory(const Path& dir);
+    Status delete_file(const Path& file);
+    Status delete_directory(const Path& dir);
+    Status batch_delete(const std::vector<Path>& files);
+    Status exists(const Path& path, bool* res) const;
+    Status file_size(const Path& file, size_t* file_size) const;
+    Status list(const Path& dir, bool regular_file, std::vector<FileInfo>* 
files);
+    Status rename(const Path& orig_name, const Path& new_name);
+    Status rename_dir(const Path& orig_name, const Path& new_name);
+
+public:

Review Comment:
   warning: redundant access specifier has the same accessibility as the 
previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/io/fs/file_system.h:46:** previously declared here
   ```cpp
   public:
   ^
   ```
   



##########
be/src/io/file_factory.cpp:
##########
@@ -56,8 +55,7 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
         break;
     }
     case TFileType::FILE_HDFS: {
-        RETURN_IF_ERROR(create_hdfs_writer(
-                const_cast<std::map<std::string, std::string>&>(properties), 
path, file_writer));
+        file_writer.reset(new HDFSWriter(const_cast<std::map<std::string, 
std::string>&>(properties), path));

Review Comment:
   warning: unknown type name 'HDFSWriter' [clang-diagnostic-error]
   ```cpp
           file_writer.reset(new HDFSWriter(const_cast<std::map<std::string, 
std::string>&>(properties), path));
                                 ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)

Review Comment:
   warning: use of undeclared identifier 'HdfsFileWriter' 
[clang-diagnostic-error]
   ```cpp
   HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)
   ^
   ```
   



##########
be/src/io/fs/hdfs_file_system.h:
##########
@@ -83,47 +83,39 @@ class HdfsFileSystem final : public RemoteFileSystem {
 public:
     static std::shared_ptr<HdfsFileSystem> create(const THdfsParams& 
hdfs_params,
                                                   const std::string& path);
-
     ~HdfsFileSystem() override;
 
-    Status create_file(const Path& path, FileWriterPtr* writer) override;
-
-    Status open_file(const Path& path, FileReaderSPtr* reader, IOContext* 
io_ctx) override;
-
-    Status delete_file(const Path& path) override;
-
-    Status create_directory(const Path& path) override;
-
-    // Delete all files under path.
-    Status delete_directory(const Path& path) override;
-
-    Status link_file(const Path& /*src*/, const Path& /*dest*/) override {
-        return Status::NotSupported("Not supported");
-    }
-
-    Status exists(const Path& path, bool* res) const override;
-
-    Status file_size(const Path& path, size_t* file_size) const override;
-
-    Status list(const Path& path, std::vector<Path>* files) override;
-
-    Status upload(const Path& /*local_path*/, const Path& /*dest_path*/) 
override {
-        return Status::NotSupported("Currently not support to upload file to 
HDFS");
-    }
-
-    Status batch_upload(const std::vector<Path>& /*local_paths*/,
-                        const std::vector<Path>& /*dest_paths*/) override {
-        return Status::NotSupported("Currently not support to batch upload 
file to HDFS");
-    }
+    HdfsFileSystemHandle* get_handle();
 
-    Status connect() override;
+protected:
+    Status connect_impl() override;
+    Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
+    Status open_file_internal(const Path& file, FileReaderSPtr* reader) 
override;
+    Status create_directory_impl(const Path& dir) override;
+    Status delete_file_impl(const Path& file) override;
+    Status delete_directory_impl(const Path& dir) override;
+    Status batch_delete_impl(const std::vector<Path>& files) override;
+    Status exists_impl(const Path& path, bool* res) const override;
+    Status file_size_impl(const Path& file, size_t* file_size) const override;
+    Status list_impl(const Path& dir, bool is_regular, std::vector<FileInfo>* 
files) override;
+    Status rename_impl(const Path& orig_name, const Path& new_name) override;
+    Status rename_dir_impl(const Path& orig_name, const Path& new_name) 
override;
+
+    Status upload_impl(const Path& local_file, const Path& remote_file) 
override;
+    Status batch_upload_impl(const std::vector<Path>& local_files,
+                                const std::vector<Path>& remote_files) 
override;
+    Status direct_upload_impl(const Path& remote_file, const std::string& 
content) override;
+    Status upload_with_checksum_impl(const Path& local_file, const Path& 
remote_file,
+                                        const std::string& checksum) override;
+    Status download_impl(const Path& remote_file, const Path& local_file) 
override;
+    Status direct_download_impl(const Path& remote_file, std::string* content) 
override;
 
-    HdfsFileSystemHandle* get_handle();
+private:
+    Status delete_internal(const Path& path, int is_recursive);
 
 private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the 
previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/io/fs/hdfs_file_system.h:112:** previously declared here
   ```cpp
   private:
   ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);
     if (bthread_self() == 0) {
-        return open_file_impl(path, reader_options, reader, io_ctx);
+        return upload_impl(local_file, dest_path);
     }
     Status s;
-    auto task = [&] { s = open_file_impl(path, reader_options, reader, 
io_ctx); };
-    AsyncIO::run_task(task, io::FileSystemType::S3);
+    auto task = [&] { s = upload_impl(local_file, dest_path); };
+    AsyncIO::run_task(task, _type);

Review Comment:
   warning: use of undeclared identifier '_type' [clang-diagnostic-error]
   ```cpp
       AsyncIO::run_task(task, _type);
                               ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)
+        : FileWriter(file, fs){
+}
+
+HdfsFileWriter::~HdfsFileWriter() {
+    CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << 
_closed;
+}
+
+Status HdfsFileWriter::close() {
+    if (_closed) {
+        return Status::OK();
+    }
+    _closed = true;
+    if (_hdfs_file == nullptr) {
+        return Status::OK();
+    }
+    int result = hdfsFlush(_fs->_fs_handle->hdfs_fs, _hdfs_file);
+    if (result == -1) {
+        std::stringstream ss;
+        ss << "failed to flush hdfs file. "
+           << "(BE: " << BackendOptions::get_localhost() << ")"
+           << "namenode:" << _fs->_namenode << " path:" << _file << ", err: " 
<< hdfsGetLastError();
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    hdfsCloseFile(_fs->_fs_handle->hdfs_fs, _hdfs_file);
+    _hdfs_file = nullptr;
+    return Status::OK();
+}
+
+Status HdfsFileWriter::abort() {
+    // TODO: should delete remote file
+    return Status::OK();
+}
+
+Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {
+    DCHECK(!_closed);
+    if (!_opened) {
+        RETURN_IF_ERROR(_open());
+        _opened = true;
+    }
+
+    for (size_t i = 0; i < data_cnt; i++) {
+        const Slice& result = data[i];
+        size_t left_bytes = result.size;
+        const char* p = result.data;
+        while (left_bytes > 0) {
+            size_t written_bytes = hdfsWrite(_fs->_fs_handle->hdfs_fs, 
_hdfs_file, p, left_bytes);
+            if (written_bytes < 0) {
+                return Status::InternalError("write hdfs failed. namenode: {}, 
path: {}, error: {}",
+                        _fs->_namenode, _file, hdfsGetLastError());
+            }
+            left_bytes -= written_bytes;
+            p += written_bytes;
+            _bytes_appended += written_bytes;
+        }
+    }
+    return Status::OK();
+}
+
+// Call this method when there is no more data to write.
+// FIXME(cyx): Does not seem to be an appropriate interface for file system?
+Status HdfsFileWriter::finalize() {
+    DCHECK(!_closed);
+    if (_opened) {
+        RETURN_IF_ERROR(close());
+    }
+    return Status::OK();
+}
+
+Status HdfsFileWriter::_open() {

Review Comment:
   warning: use of undeclared identifier 'HdfsFileWriter' 
[clang-diagnostic-error]
   ```cpp
   Status HdfsFileWriter::_open() {
          ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);
     if (bthread_self() == 0) {
-        return open_file_impl(path, reader_options, reader, io_ctx);
+        return upload_impl(local_file, dest_path);
     }
     Status s;
-    auto task = [&] { s = open_file_impl(path, reader_options, reader, 
io_ctx); };
-    AsyncIO::run_task(task, io::FileSystemType::S3);
+    auto task = [&] { s = upload_impl(local_file, dest_path); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status batch_upload(const std::vector<Path>& local_files,
+                            const std::vector<Path>& remote_files) {
+    std::vector<Path> remote_paths;
+    for (auto& path : remote_files) {
+        remote_paths.push_back(absolute_path(path));
+    }
+    if (bthread_self() == 0) {
+        return batch_upload_impl(local_files, remote_paths);
+    }
+    Status s;
+    auto task = [&] { s = batch_upload_impl(local_files, remote_paths); };

Review Comment:
   warning: use of undeclared identifier 'batch_upload_impl'; did you mean 
'batch_upload'? [clang-diagnostic-error]
   
   ```suggestion
       auto task = [&] { s = batch_upload(local_files, remote_paths); };
   ```
   **be/src/io/fs/remote_file_system.cpp:39:** 'batch_upload' declared here
   ```cpp
   Status batch_upload(const std::vector<Path>& local_files,
          ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);
     if (bthread_self() == 0) {
-        return open_file_impl(path, reader_options, reader, io_ctx);
+        return upload_impl(local_file, dest_path);
     }
     Status s;
-    auto task = [&] { s = open_file_impl(path, reader_options, reader, 
io_ctx); };
-    AsyncIO::run_task(task, io::FileSystemType::S3);
+    auto task = [&] { s = upload_impl(local_file, dest_path); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status batch_upload(const std::vector<Path>& local_files,
+                            const std::vector<Path>& remote_files) {
+    std::vector<Path> remote_paths;
+    for (auto& path : remote_files) {
+        remote_paths.push_back(absolute_path(path));
+    }
+    if (bthread_self() == 0) {
+        return batch_upload_impl(local_files, remote_paths);
+    }
+    Status s;
+    auto task = [&] { s = batch_upload_impl(local_files, remote_paths); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status direct_upload(const Path& remote_file, const std::string& content) {
+    auto remote_path = absolute_path(remote_file);

Review Comment:
   warning: use of undeclared identifier 'absolute_path' 
[clang-diagnostic-error]
   ```cpp
       auto remote_path = absolute_path(remote_file);
                          ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)

Review Comment:
   warning: unknown type name 'HdfsFileSystem'; did you mean 
'io::HdfsFileSystem'? [clang-diagnostic-error]
   
   ```suggestion
   HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<io::HdfsFileSystem>& fs)
   ```
   **be/src/io/fs/hdfs_file_system.h:81:** 'io::HdfsFileSystem' declared here
   ```cpp
   class HdfsFileSystem final : public RemoteFileSystem {
         ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);
     if (bthread_self() == 0) {
-        return open_file_impl(path, reader_options, reader, io_ctx);
+        return upload_impl(local_file, dest_path);
     }
     Status s;
-    auto task = [&] { s = open_file_impl(path, reader_options, reader, 
io_ctx); };
-    AsyncIO::run_task(task, io::FileSystemType::S3);
+    auto task = [&] { s = upload_impl(local_file, dest_path); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status batch_upload(const std::vector<Path>& local_files,
+                            const std::vector<Path>& remote_files) {
+    std::vector<Path> remote_paths;
+    for (auto& path : remote_files) {
+        remote_paths.push_back(absolute_path(path));
+    }
+    if (bthread_self() == 0) {
+        return batch_upload_impl(local_files, remote_paths);
+    }
+    Status s;
+    auto task = [&] { s = batch_upload_impl(local_files, remote_paths); };
+    AsyncIO::run_task(task, _type);

Review Comment:
   warning: use of undeclared identifier '_type' [clang-diagnostic-error]
   ```cpp
       AsyncIO::run_task(task, _type);
                               ^
   ```
   



##########
be/src/io/fs/hdfs_file_system.cpp:
##########
@@ -83,32 +84,25 @@ HdfsFileSystem::~HdfsFileSystem() {
     }
 }
 
-Status HdfsFileSystem::connect() {
+Status HdfsFileSystem::connect_impl() {
     
RETURN_IF_ERROR(HdfsFileSystemCache::instance()->get_connection(_hdfs_params, 
&_fs_handle));
     if (!_fs_handle) {
         return Status::InternalError("failed to init Hdfs handle with, please 
check hdfs params.");
     }
     return Status::OK();
 }
 
-Status HdfsFileSystem::create_file(const Path& /*path*/, FileWriterPtr* 
/*writer*/) {
-    // auto handle = get_handle();
-    // CHECK_HDFS_HANDLE(handle);
-    // auto hdfs_file = hdfsOpenFile(handle->hdfs_fs, path.string().c_str(), 
O_WRONLY, 0, 0, 0);
-    // if (hdfs_file == nullptr) {
-    //     return Status::InternalError("Failed to create file {}", 
path.string());
-    // }
-    // hdfsCloseFile(handle->hdfs_fs, hdfs_file);
-    // return Status::OK();
-    return Status::NotSupported("Currently not support to create file to 
HDFS");
+Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* 
writer) {
+    *writer = std::make_unique<HdfsWriter>(file, std::shared_from_this());    
+    return (*writer)->open();
 }
 
-Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader, 
IOContext* /*io_ctx*/) {
+Status HdfsFileSystem::open_file_internal(const Path& file, FileReaderSPtr* 
reader) {
     CHECK_HDFS_HANDLE(_fs_handle);
     size_t file_len = 0;
-    RETURN_IF_ERROR(file_size(path, &file_len));
+    RETURN_IF_ERROR(file_size_impl(file, &file_len));
 
-    Path real_path = _covert_path(path);
+    Path real_path = convert_path(file);

Review Comment:
   warning: use of undeclared identifier 'convert_path'; did you mean 
'covert_path'? [clang-diagnostic-error]
   
   ```suggestion
       Path real_path = covert_path(file);
   ```
   **be/src/util/hdfs_util.h:44:** 'covert_path' declared here
   ```cpp
   Path covert_path(const Path& path) const {
        ^
   ```
   



##########
be/src/io/fs/file_system.h:
##########
@@ -21,58 +21,111 @@
 
 #include "common/status.h"
 #include "gutil/macros.h"
-#include "io/fs/file_reader.h"
 #include "io/fs/file_reader_options.h"
-#include "io/fs/file_writer.h"
 #include "io/fs/path.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "io/io_common.h"
 
 namespace doris {
 namespace io {
 
-class FileWriter;
-class FileReader;
-
 enum class FileSystemType : uint8_t {
     LOCAL,
     S3,
     HDFS,
     BROKER,
 };
 
+struct FileInfo {
+    // only file name, no path
+    std::string file_name;
+    size_t file_size;
+    bool is_file;
+};
+
 class FileSystem : public std::enable_shared_from_this<FileSystem> {
 public:
+    // The following are public interface.
+    // And derived classes should implement all xxx_impl methods.
+    Status create_file(const Path& file, FileWriterPtr* writer);
+    Status open_file(const Path& file, const FileReaderOptions& reader_options,
+                             FileReaderSPtr* reader, IOContext* io_ctx);
+    Status create_directory(const Path& dir);
+    Status delete_file(const Path& file);
+    Status delete_directory(const Path& dir);
+    Status batch_delete(const std::vector<Path>& files);
+    Status exists(const Path& path, bool* res) const;
+    Status file_size(const Path& file, size_t* file_size) const;
+    Status list(const Path& dir, bool regular_file, std::vector<FileInfo>* 
files);
+    Status rename(const Path& orig_name, const Path& new_name);
+    Status rename_dir(const Path& orig_name, const Path& new_name);
+
+public:
+    // the root path of this fs.
+    // if not empty, all given Path will be "_root_path/path"
+    const Path& root_path() const { return _root_path; }
+    // a unique id of this fs.
+    // used for cache or re-use.
+    // can be empty if not used
+    const std::string& id() const { return _id; }
+    // file system type
+    FileSystemType type() const { return _type; }
+
     virtual ~FileSystem() = default;
 
+    // Each derived class should implement create method to create fs.
     DISALLOW_COPY_AND_ASSIGN(FileSystem);
 
-    virtual Status create_file(const Path& path, FileWriterPtr* writer) = 0;
+protected:
+    /// create file and return a FileWriter
+    virtual Status create_file_impl(const Path& file, FileWriterPtr* writer) = 
0;
 
-    virtual Status open_file(const Path& path, const FileReaderOptions& 
reader_options,
+    /// open file and return a FileReader
+    virtual Status open_file_impl(const Path& file, const FileReaderOptions& 
reader_options,
                              FileReaderSPtr* reader, IOContext* io_ctx) = 0;
 
-    virtual Status open_file(const Path& path, FileReaderSPtr* reader, 
IOContext* io_ctx) = 0;
+    /// create directory recursively
+    virtual Status create_directory_impl(const Path& dir) = 0;
 
-    virtual Status delete_file(const Path& path) = 0;
+    /// delete file.
+    /// return OK if file does not exist
+    /// return ERR if not a regular file
+    virtual Status delete_file_impl(const Path& file) = 0;
 
-    // create directory recursively
-    virtual Status create_directory(const Path& path) = 0;
+    /// delete all files in "files"
+    virtual Status batch_delete_impl(const std::vector<Path>& files) = 0;
 
-    // remove all under directory recursively
-    virtual Status delete_directory(const Path& path) = 0;
+    /// remove all under directory recursively
+    /// return OK if dir does not exist
+    /// return ERR if not a dir
+    virtual Status delete_directory_impl(const Path& dir) = 0;
 
-    // hard link `src` to `dest`
-    // FIXME(cyx): Should we move this method to LocalFileSystem?
-    virtual Status link_file(const Path& src, const Path& dest) = 0;
+    /// check if path exist
+    /// return OK and res = 1 means exist, res = 0 means does not exist
+    /// return ERR otherwise
+    virtual Status exists_impl(const Path& path, bool* res) const = 0;
 
-    virtual Status exists(const Path& path, bool* res) const = 0;
+    /// return OK and get size of given file, save in "file_size".
+    /// return ERR otherwise
+    virtual Status file_size_impl(const Path& file, size_t* file_size) const = 
0;
 
-    virtual Status file_size(const Path& path, size_t* file_size) const = 0;
+    /// return OK and list all objects in "dir", save in "files"
+    /// return ERR otherwise
+    /// will not traverse dir recursively.
+    virtual Status list_impl(const Path& dir, bool regular_file, 
std::vector<FileInfo>* files) = 0;
 
-    virtual Status list(const Path& path, std::vector<Path>* files) = 0;
+    /// rename file from orig_name to new_name
+    virtual Status rename_impl(const Path& orig_name, const Path& new_name) = 
0;
 
-    const Path& root_path() const { return _root_path; }
-    const std::string& id() const { return _id; }
-    FileSystemType type() const { return _type; }
+    /// rename dir from orig_name to new_name
+    virtual Status rename_dir_impl(const Path& orig_name, const Path& 
new_name) = 0;
+
+    Path absolute_path(const Path& path) const {
+        if (path.is_absolute()) {
+            return path;
+        }
+        return _root_path / path;
+    }
 
 protected:

Review Comment:
   warning: redundant access specifier has the same accessibility as the 
previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/io/fs/file_system.h:78:** previously declared here
   ```cpp
   protected:
   ^
   ```
   



##########
be/src/io/fs/hdfs_file_system.cpp:
##########
@@ -118,74 +112,70 @@
             _fs_handle->dec_ref();
             // retry
             RETURN_IF_ERROR(connect());
-            hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, 
path.string().c_str(), O_RDONLY, 0, 0, 0);
+            hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, 
file.string().c_str(), O_RDONLY, 0, 0, 0);
             if (hdfs_file == nullptr) {
                 return Status::InternalError(
-                        "open file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
-                        BackendOptions::get_localhost(), _namenode, 
path.string(),
+                        "open file failed. (BE: {}) namenode:{}, file:{}, err: 
{}",
+                        BackendOptions::get_localhost(), _namenode, 
file.string(),
                         hdfsGetLastError());
             }
         } else {
-            return Status::InternalError("open file failed. (BE: {}) 
namenode:{}, path:{}, err: {}",
-                                         BackendOptions::get_localhost(), 
_namenode, path.string(),
+            return Status::InternalError("open file failed. (BE: {}) 
namenode:{}, file:{}, err: {}",
+                                         BackendOptions::get_localhost(), 
_namenode, file.string(),
                                          hdfsGetLastError());
         }
     }
     *reader = std::make_shared<HdfsFileReader>(
-            path, file_len, _namenode, hdfs_file,
+            file, file_len, _namenode, hdfs_file,
             std::static_pointer_cast<HdfsFileSystem>(shared_from_this()));
     return Status::OK();
 }
 
-Status HdfsFileSystem::delete_file(const Path& path) {
+Status HdfsFileSystem::create_directory_impl(const Path& dir) {
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = _covert_path(path);
-    // The recursive argument `is_recursive` is irrelevant if path is a file.
-    int is_recursive = 0;
-    int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(), 
is_recursive);
+    Path real_path = convert_path(dir);

Review Comment:
   warning: use of undeclared identifier 'convert_path'; did you mean 
'covert_path'? [clang-diagnostic-error]
   
   ```suggestion
       Path real_path = covert_path(dir);
   ```
   **be/src/util/hdfs_util.h:44:** 'covert_path' declared here
   ```cpp
   Path covert_path(const Path& path) const {
        ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)

Review Comment:
   warning: unknown type name 'Path'; did you mean 'io::Path'? 
[clang-diagnostic-error]
   
   ```suggestion
   HdfsFileWriter::HdfsFileWriter(const io::Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)
   ```
   **be/src/io/fs/path.h:24:** 'io::Path' declared here
   ```cpp
   using Path = std::filesystem::path;
         ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)
+        : FileWriter(file, fs){
+}
+
+HdfsFileWriter::~HdfsFileWriter() {
+    CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << 
_closed;
+}
+
+Status HdfsFileWriter::close() {
+    if (_closed) {
+        return Status::OK();
+    }
+    _closed = true;
+    if (_hdfs_file == nullptr) {
+        return Status::OK();
+    }
+    int result = hdfsFlush(_fs->_fs_handle->hdfs_fs, _hdfs_file);
+    if (result == -1) {
+        std::stringstream ss;
+        ss << "failed to flush hdfs file. "
+           << "(BE: " << BackendOptions::get_localhost() << ")"
+           << "namenode:" << _fs->_namenode << " path:" << _file << ", err: " 
<< hdfsGetLastError();
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    hdfsCloseFile(_fs->_fs_handle->hdfs_fs, _hdfs_file);
+    _hdfs_file = nullptr;
+    return Status::OK();
+}
+
+Status HdfsFileWriter::abort() {

Review Comment:
   warning: use of undeclared identifier 'HdfsFileWriter' 
[clang-diagnostic-error]
   ```cpp
   Status HdfsFileWriter::abort() {
          ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)
+        : FileWriter(file, fs){
+}
+
+HdfsFileWriter::~HdfsFileWriter() {

Review Comment:
   warning: use of undeclared identifier 'HdfsFileWriter' 
[clang-diagnostic-error]
   ```cpp
   HdfsFileWriter::~HdfsFileWriter() {
   ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)
+        : FileWriter(file, fs){
+}
+
+HdfsFileWriter::~HdfsFileWriter() {
+    CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << 
_closed;
+}
+
+Status HdfsFileWriter::close() {
+    if (_closed) {
+        return Status::OK();
+    }
+    _closed = true;
+    if (_hdfs_file == nullptr) {
+        return Status::OK();
+    }
+    int result = hdfsFlush(_fs->_fs_handle->hdfs_fs, _hdfs_file);
+    if (result == -1) {
+        std::stringstream ss;
+        ss << "failed to flush hdfs file. "
+           << "(BE: " << BackendOptions::get_localhost() << ")"
+           << "namenode:" << _fs->_namenode << " path:" << _file << ", err: " 
<< hdfsGetLastError();
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    hdfsCloseFile(_fs->_fs_handle->hdfs_fs, _hdfs_file);
+    _hdfs_file = nullptr;
+    return Status::OK();
+}
+
+Status HdfsFileWriter::abort() {
+    // TODO: should delete remote file
+    return Status::OK();
+}
+
+Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {
+    DCHECK(!_closed);
+    if (!_opened) {
+        RETURN_IF_ERROR(_open());
+        _opened = true;
+    }
+
+    for (size_t i = 0; i < data_cnt; i++) {
+        const Slice& result = data[i];
+        size_t left_bytes = result.size;
+        const char* p = result.data;
+        while (left_bytes > 0) {
+            size_t written_bytes = hdfsWrite(_fs->_fs_handle->hdfs_fs, 
_hdfs_file, p, left_bytes);
+            if (written_bytes < 0) {
+                return Status::InternalError("write hdfs failed. namenode: {}, 
path: {}, error: {}",
+                        _fs->_namenode, _file, hdfsGetLastError());
+            }
+            left_bytes -= written_bytes;
+            p += written_bytes;
+            _bytes_appended += written_bytes;
+        }
+    }
+    return Status::OK();
+}
+
+// Call this method when there is no more data to write.
+// FIXME(cyx): Does not seem to be an appropriate interface for file system?
+Status HdfsFileWriter::finalize() {

Review Comment:
   warning: use of undeclared identifier 'HdfsFileWriter' 
[clang-diagnostic-error]
   ```cpp
   Status HdfsFileWriter::finalize() {
          ^
   ```
   



##########
be/src/io/fs/hdfs_file_system.cpp:
##########
@@ -118,74 +112,70 @@
             _fs_handle->dec_ref();
             // retry
             RETURN_IF_ERROR(connect());
-            hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, 
path.string().c_str(), O_RDONLY, 0, 0, 0);
+            hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, 
file.string().c_str(), O_RDONLY, 0, 0, 0);
             if (hdfs_file == nullptr) {
                 return Status::InternalError(
-                        "open file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
-                        BackendOptions::get_localhost(), _namenode, 
path.string(),
+                        "open file failed. (BE: {}) namenode:{}, file:{}, err: 
{}",
+                        BackendOptions::get_localhost(), _namenode, 
file.string(),
                         hdfsGetLastError());
             }
         } else {
-            return Status::InternalError("open file failed. (BE: {}) 
namenode:{}, path:{}, err: {}",
-                                         BackendOptions::get_localhost(), 
_namenode, path.string(),
+            return Status::InternalError("open file failed. (BE: {}) 
namenode:{}, file:{}, err: {}",
+                                         BackendOptions::get_localhost(), 
_namenode, file.string(),
                                          hdfsGetLastError());
         }
     }
     *reader = std::make_shared<HdfsFileReader>(
-            path, file_len, _namenode, hdfs_file,
+            file, file_len, _namenode, hdfs_file,
             std::static_pointer_cast<HdfsFileSystem>(shared_from_this()));
     return Status::OK();
 }
 
-Status HdfsFileSystem::delete_file(const Path& path) {
+Status HdfsFileSystem::create_directory_impl(const Path& dir) {
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = _covert_path(path);
-    // The recursive argument `is_recursive` is irrelevant if path is a file.
-    int is_recursive = 0;
-    int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(), 
is_recursive);
+    Path real_path = convert_path(dir);
+    int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, 
real_path.string().c_str());
     if (res == -1) {
-        return Status::InternalError("Failed to delete file {}", 
path.string());
+        return Status::InternalError("Failed to create directory {}", 
dir.string());
     }
     return Status::OK();
 }
 
-Status HdfsFileSystem::create_directory(const Path& path) {
-    CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = _covert_path(path);
-    int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, 
real_path.string().c_str());
-    if (res == -1) {
-        return Status::InternalError("Failed to create directory {}", 
path.string());
+Status HdfsFileSystem::delete_file_impl(const Path& file) {
+    return delete_internal(file, 0);
+}
+
+Status HdfsFileSystem::delete_directory_impl(const Path& file) {
+    return delete_internal(file, 1);
+}
+
+Status HdfsFileSystem::batch_delete_impl(const std::vector<Path>& files) {
+    for (auto& file : files) {
+        RETURN_IF_ERROR(delete_file_impl(file));
     }
-    return Status::OK();
 }
 
-Status HdfsFileSystem::delete_directory(const Path& path) {
+Status HdfsFileSystem::delete_internal(const Path& path, int is_recursive) {
     CHECK_HDFS_HANDLE(_fs_handle);
-    Path real_path = _covert_path(path);
-    // delete in recursive mode
-    int is_recursive = 1;
+    Path real_path = convert_path(path);

Review Comment:
   warning: use of undeclared identifier 'convert_path'; did you mean 
'covert_path'? [clang-diagnostic-error]
   
   ```suggestion
       Path real_path = covert_path(path);
   ```
   **be/src/util/hdfs_util.h:44:** 'covert_path' declared here
   ```cpp
   Path covert_path(const Path& path) const {
        ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)
+        : FileWriter(file, fs){
+}
+
+HdfsFileWriter::~HdfsFileWriter() {
+    CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << 
_closed;
+}
+
+Status HdfsFileWriter::close() {
+    if (_closed) {
+        return Status::OK();
+    }
+    _closed = true;
+    if (_hdfs_file == nullptr) {
+        return Status::OK();
+    }
+    int result = hdfsFlush(_fs->_fs_handle->hdfs_fs, _hdfs_file);
+    if (result == -1) {
+        std::stringstream ss;
+        ss << "failed to flush hdfs file. "
+           << "(BE: " << BackendOptions::get_localhost() << ")"
+           << "namenode:" << _fs->_namenode << " path:" << _file << ", err: " 
<< hdfsGetLastError();
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    hdfsCloseFile(_fs->_fs_handle->hdfs_fs, _hdfs_file);
+    _hdfs_file = nullptr;
+    return Status::OK();
+}
+
+Status HdfsFileWriter::abort() {
+    // TODO: should delete remote file
+    return Status::OK();
+}
+
+Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {

Review Comment:
   warning: use of undeclared identifier 'HdfsFileWriter' 
[clang-diagnostic-error]
   ```cpp
   Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {
          ^
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/hdfs_file_writer.h"
+
+#include <filesystem>
+
+#include "common/logging.h"
+#include "service/backend_options.h"
+#include "io/fs/hdfs_file_system.h"
+
+namespace doris {
+
+HdfsFileWriter::HdfsFileWriter(const Path& file, const 
std::shared_ptr<HdfsFileSystem>& fs)
+        : FileWriter(file, fs){
+}
+
+HdfsFileWriter::~HdfsFileWriter() {
+    CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << 
_closed;
+}
+
+Status HdfsFileWriter::close() {

Review Comment:
   warning: use of undeclared identifier 'HdfsFileWriter' 
[clang-diagnostic-error]
   ```cpp
   Status HdfsFileWriter::close() {
          ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);

Review Comment:
   warning: use of undeclared identifier 'absolute_path' 
[clang-diagnostic-error]
   ```cpp
       auto dest_path = absolute_path(dest_file);
                        ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);
     if (bthread_self() == 0) {
-        return open_file_impl(path, reader_options, reader, io_ctx);
+        return upload_impl(local_file, dest_path);
     }
     Status s;
-    auto task = [&] { s = open_file_impl(path, reader_options, reader, 
io_ctx); };
-    AsyncIO::run_task(task, io::FileSystemType::S3);
+    auto task = [&] { s = upload_impl(local_file, dest_path); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status batch_upload(const std::vector<Path>& local_files,
+                            const std::vector<Path>& remote_files) {
+    std::vector<Path> remote_paths;
+    for (auto& path : remote_files) {
+        remote_paths.push_back(absolute_path(path));

Review Comment:
   warning: use of undeclared identifier 'absolute_path' 
[clang-diagnostic-error]
   ```cpp
           remote_paths.push_back(absolute_path(path));
                                  ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);
     if (bthread_self() == 0) {
-        return open_file_impl(path, reader_options, reader, io_ctx);
+        return upload_impl(local_file, dest_path);
     }
     Status s;
-    auto task = [&] { s = open_file_impl(path, reader_options, reader, 
io_ctx); };
-    AsyncIO::run_task(task, io::FileSystemType::S3);
+    auto task = [&] { s = upload_impl(local_file, dest_path); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status batch_upload(const std::vector<Path>& local_files,
+                            const std::vector<Path>& remote_files) {
+    std::vector<Path> remote_paths;
+    for (auto& path : remote_files) {
+        remote_paths.push_back(absolute_path(path));
+    }
+    if (bthread_self() == 0) {
+        return batch_upload_impl(local_files, remote_paths);

Review Comment:
   warning: use of undeclared identifier 'batch_upload_impl'; did you mean 
'batch_upload'? [clang-diagnostic-error]
   
   ```suggestion
           return batch_upload(local_files, remote_paths);
   ```
   **be/src/io/fs/remote_file_system.cpp:39:** 'batch_upload' declared here
   ```cpp
   Status batch_upload(const std::vector<Path>& local_files,
          ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);
     if (bthread_self() == 0) {
-        return open_file_impl(path, reader_options, reader, io_ctx);
+        return upload_impl(local_file, dest_path);
     }
     Status s;
-    auto task = [&] { s = open_file_impl(path, reader_options, reader, 
io_ctx); };
-    AsyncIO::run_task(task, io::FileSystemType::S3);
+    auto task = [&] { s = upload_impl(local_file, dest_path); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status batch_upload(const std::vector<Path>& local_files,
+                            const std::vector<Path>& remote_files) {
+    std::vector<Path> remote_paths;
+    for (auto& path : remote_files) {
+        remote_paths.push_back(absolute_path(path));
+    }
+    if (bthread_self() == 0) {
+        return batch_upload_impl(local_files, remote_paths);
+    }
+    Status s;
+    auto task = [&] { s = batch_upload_impl(local_files, remote_paths); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status direct_upload(const Path& remote_file, const std::string& content) {
+    auto remote_path = absolute_path(remote_file);
+    if (bthread_self() == 0) {
+        return direct_upload_impl(remote_path, content);
+    }
+    Status s;
+    auto task = [&] { s = direct_upload_impl(remote_path, content); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status upload_with_checksum(const Path& local_file, const Path& remote,
+                                    const std::string& checksum) {
+    auto remote_path = absolute_path(remote);

Review Comment:
   warning: use of undeclared identifier 'absolute_path' 
[clang-diagnostic-error]
   ```cpp
       auto remote_path = absolute_path(remote);
                          ^
   ```
   



##########
be/src/io/fs/remote_file_system.cpp:
##########
@@ -26,21 +26,91 @@
 namespace doris {
 namespace io {
 
-Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& 
reader_options,
-                                   FileReaderSPtr* reader, IOContext* io_ctx) {
+Status upload(const Path& local_file, const Path& dest_file) {
+    auto dest_path = absolute_path(dest_file);
     if (bthread_self() == 0) {
-        return open_file_impl(path, reader_options, reader, io_ctx);
+        return upload_impl(local_file, dest_path);
     }
     Status s;
-    auto task = [&] { s = open_file_impl(path, reader_options, reader, 
io_ctx); };
-    AsyncIO::run_task(task, io::FileSystemType::S3);
+    auto task = [&] { s = upload_impl(local_file, dest_path); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status batch_upload(const std::vector<Path>& local_files,
+                            const std::vector<Path>& remote_files) {
+    std::vector<Path> remote_paths;
+    for (auto& path : remote_files) {
+        remote_paths.push_back(absolute_path(path));
+    }
+    if (bthread_self() == 0) {
+        return batch_upload_impl(local_files, remote_paths);
+    }
+    Status s;
+    auto task = [&] { s = batch_upload_impl(local_files, remote_paths); };
+    AsyncIO::run_task(task, _type);
+    return s;
+}
+
+Status direct_upload(const Path& remote_file, const std::string& content) {
+    auto remote_path = absolute_path(remote_file);
+    if (bthread_self() == 0) {
+        return direct_upload_impl(remote_path, content);
+    }
+    Status s;
+    auto task = [&] { s = direct_upload_impl(remote_path, content); };
+    AsyncIO::run_task(task, _type);

Review Comment:
   warning: use of undeclared identifier '_type' [clang-diagnostic-error]
   ```cpp
       AsyncIO::run_task(task, _type);
                               ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to