This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 35d0ed998cf699f59f71b82e2589d7d659a5ccb3
Author: Qi Chen <[email protected]>
AuthorDate: Tue Apr 23 19:18:39 2024 +0800

    [Fix](hdfs-writer) Fix hdfs file writer core with `check failed: _ref_cnt 
== 0` in dtor of `HdfsFileWriter`. (#33959)
    
    ## Issue:
    ```
    F20240421 17:14:37.494115 184986 hdfs_util.h:65] Check failed: _ref_cnt == 0
    *** Check failure stack trace: ***
    F20240421 17:14:37.505879 185108 hdfs_util.h:65] Check failed: _ref_cnt == 0
    *** Check failure stack trace: ***
        @     0x556f5236d316  google::LogMessageFatal::~LogMessageFatal()
        @     0x556f5236d316  google::LogMessageFatal::~LogMessageFatal()
        @     0x556f2830e200  doris::io::HdfsFileWriter::~HdfsFileWriter()
        @     0x556f2830e200  doris::io::HdfsFileWriter::~HdfsFileWriter()
        @     0x556f2830e21e  doris::io::HdfsFileWriter::~HdfsFileWriter()
        @     0x556f2830e21e  doris::io::HdfsFileWriter::~HdfsFileWriter()
        @     0x556f507893b0  
doris::vectorized::VHivePartitionWriter::~VHivePartitionWriter()
        @     0x556f507893b0  
doris::vectorized::VHivePartitionWriter::~VHivePartitionWriter()
        @     0x556f506c005e  std::_Hashtable<>::clear()
        @     0x556f506c005e  std::_Hashtable<>::clear()
        @     0x556f50780f4f  doris::vectorized::VHiveTableWriter::close()
        @     0x556f50780f4f  doris::vectorized::VHiveTableWriter::close()
        @     0x556f5072bc4f  
doris::vectorized::AsyncResultWriter::process_block()
        @     0x556f5072bc4f  
doris::vectorized::AsyncResultWriter::process_block()
        @     0x556f5072cd01  std::_Function_handler<>::_M_invoke()
        @     0x556f5072cd01  std::_Function_handler<>::_M_invoke()
        @     0x556f2b02b73d  doris::ThreadPool::dispatch_thread()
        @     0x556f2b02b73d  doris::ThreadPool::dispatch_thread()
        @     0x556f2b008d59  doris::Thread::supervise_thread()
        @     0x556f2b008d59  doris::Thread::supervise_thread()
        @     0x7f2c2bfb4609  start_thread
        @     0x7f2c2bfb4609  start_thread
        @     0x7f2c2c261133  clone
        @     0x7f2c2c261133  clone
        @              (nil)  (unknown)
    *** Query id: ac4f457c003d4489-b04ac56ef05b12f0 ***
    *** is nereids: 1 ***
    *** tablet id: 0 ***
    *** Aborted at 1713690877 (unix time) try "date -d @1713690877" if you are 
using GNU date ***
    *** Current BE git commitID: e6f4a2f ***
    *** SIGABRT unknown detail explain (@0x38a6) received by PID 14502 (TID 
184986 OR 0x7f21d0614700) from PID 14502; stack trace: ***
        @              (nil)  (unknown)
    F20240421 17:14:37.505879 185108 hdfs_util.h:65] Check failed: _ref_cnt == 
0 F20240421 17:14:37.887202 185110 hdfs_util.h:65] Check failed: _ref_cnt == 0
    *** Check failure stack trace: ***
        @     0x556f5236d316  google::LogMessageFatal::~LogMessageFatal()
        @     0x556f2830e200  doris::io::HdfsFileWriter::~HdfsFileWriter()
        @     0x556f2830e21e  doris::io::HdfsFileWriter::~HdfsFileWriter()
        @     0x556f507893b0  
doris::vectorized::VHivePartitionWriter::~VHivePartitionWriter()
     0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, 
siginfo_t*, void*) at /root/doris/be/src/common/signal_handler.h:421
    ```
    The root cause is
    When it cannot be processed in the cache (such as when the cache is full), 
we will create a `fs_handler`, and the life cycle of `fs_handler` is managed by 
caller. We have separated the hdfs writer and can create `fs_handler` 
separately, so `HdfsFileSystem` and `HdfsFileWriter` may be callers. In 
`HdfsFileSystem`, the `fs_handler` of `HdfsFileWriter` is shared, so it needs 
to be changed to `shared_ptr`.
    
    ### Solution
    Fix hdfs file writer core with `check failed: _ref_cnt == 0` in dtor of 
`HdfsFileWriter`.
    Change `fs_handler` ptr to `shared_ptr` and remove ref count operations.
---
 be/src/io/file_factory.cpp        | 10 +++-------
 be/src/io/fs/hdfs_file_system.cpp | 12 +-----------
 be/src/io/fs/hdfs_file_system.h   |  4 +---
 be/src/io/fs/hdfs_file_writer.cpp | 11 +++--------
 be/src/io/fs/hdfs_file_writer.h   |  9 +++++----
 be/src/io/hdfs_util.cpp           | 20 ++++++++++----------
 be/src/io/hdfs_util.h             | 23 ++++++++---------------
 7 files changed, 31 insertions(+), 58 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 167fcf31b76..d46e94db799 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -123,14 +123,10 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer(
     }
     case TFileType::FILE_HDFS: {
         THdfsParams hdfs_params = parse_properties(properties);
-        io::HdfsHandler* handler;
+        std::shared_ptr<io::HdfsHandler> handler;
         
RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection(
                 hdfs_params, hdfs_params.fs_name, &handler));
-        auto res = io::HdfsFileWriter::create(path, handler, 
hdfs_params.fs_name, &options);
-        if (!res.has_value()) {
-            handler->dec_ref();
-        }
-        return res;
+        return io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, 
&options);
     }
     default:
         return ResultError(
@@ -165,7 +161,7 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
                 });
     }
     case TFileType::FILE_HDFS: {
-        io::HdfsHandler* handler;
+        std::shared_ptr<io::HdfsHandler> handler;
         // FIXME(plat1ko): Explain the difference between 
`system_properties.hdfs_params.fs_name`
         // and `file_description.fs_name`, it's so confused.
         const auto* fs_name = &file_description.fs_name;
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index f86b3ef588a..24976f11941 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -86,15 +86,7 @@ HdfsFileSystem::HdfsFileSystem(const THdfsParams& 
hdfs_params, std::string fs_na
     }
 }
 
-HdfsFileSystem::~HdfsFileSystem() {
-    if (_fs_handle != nullptr) {
-        if (_fs_handle->from_cache) {
-            _fs_handle->dec_ref();
-        } else {
-            delete _fs_handle;
-        }
-    }
-}
+HdfsFileSystem::~HdfsFileSystem() = default;
 
 Status HdfsFileSystem::init() {
     RETURN_IF_ERROR(
@@ -107,13 +99,11 @@ Status HdfsFileSystem::init() {
 
 Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* 
writer,
                                         const FileWriterOptions* opts) {
-    _fs_handle->inc_ref();
     auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name, opts);
     if (res.has_value()) {
         *writer = std::move(res).value();
         return Status::OK();
     } else {
-        _fs_handle->dec_ref();
         return std::move(res).error();
     }
 }
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index 23ae65b0820..f7c9ea22315 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -88,9 +88,7 @@ private:
                    RuntimeProfile* profile, std::string root_path);
     const THdfsParams& _hdfs_params; // Only used in init, so we can use 
reference here
     std::string _fs_name;
-    // do not use std::shared_ptr or std::unique_ptr
-    // _fs_handle is managed by HdfsFileSystemCache
-    HdfsHandler* _fs_handle = nullptr;
+    std::shared_ptr<HdfsHandler> _fs_handle = nullptr;
     RuntimeProfile* _profile = nullptr;
 };
 } // namespace io
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index b2ea4dace94..d15745ebf37 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -46,10 +46,10 @@ bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_writt
 
 static constexpr size_t MB = 1024 * 1024;
 
-HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile 
hdfs_file,
+HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> 
handler, hdfsFile hdfs_file,
                                std::string fs_name, const FileWriterOptions* 
opts)
         : _path(std::move(path)),
-          _hdfs_handler(handler),
+          _hdfs_handler(std::move(handler)),
           _hdfs_file(hdfs_file),
           _fs_name(std::move(fs_name)),
           _sync_file_data(opts ? opts->sync_file_data : true),
@@ -71,11 +71,6 @@ HdfsFileWriter::~HdfsFileWriter() {
         hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
     }
 
-    if (_hdfs_handler->from_cache) {
-        _hdfs_handler->dec_ref();
-    } else {
-        delete _hdfs_handler;
-    }
     hdfs_file_being_written << -1;
 }
 
@@ -282,7 +277,7 @@ Status HdfsFileWriter::finalize() {
     return Status::OK();
 }
 
-Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* 
handler,
+Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, 
std::shared_ptr<HdfsHandler> handler,
                                              const std::string& fs_name,
                                              const FileWriterOptions* opts) {
     auto path = convert_path(full_path, fs_name);
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index e6aa623cada..f6fa66c5dbe 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -36,11 +36,12 @@ public:
     // - fs_name/path_to_file
     // - /path_to_file
     // TODO(plat1ko): Support related path for cloud mode
-    static Result<FileWriterPtr> create(Path path, HdfsHandler* handler, const 
std::string& fs_name,
+    static Result<FileWriterPtr> create(Path path, 
std::shared_ptr<HdfsHandler> handler,
+                                        const std::string& fs_name,
                                         const FileWriterOptions* opts = 
nullptr);
 
-    HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, 
std::string fs_name,
-                   const FileWriterOptions* opts = nullptr);
+    HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile 
hdfs_file,
+                   std::string fs_name, const FileWriterOptions* opts = 
nullptr);
     ~HdfsFileWriter() override;
 
     Status close() override;
@@ -59,7 +60,7 @@ private:
     Status _append(std::string_view content);
 
     Path _path;
-    HdfsHandler* _hdfs_handler = nullptr;
+    std::shared_ptr<HdfsHandler> _hdfs_handler = nullptr;
     hdfsFile _hdfs_file = nullptr;
     std::string _fs_name;
     size_t _bytes_appended = 0;
diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp
index 0ae5d2f371b..a9563a82139 100644
--- a/be/src/io/hdfs_util.cpp
+++ b/be/src/io/hdfs_util.cpp
@@ -81,7 +81,7 @@ bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync");
 void HdfsHandlerCache::_clean_invalid() {
     std::vector<uint64> removed_handle;
     for (auto& item : _cache) {
-        if (item.second->invalid() && item.second->ref_cnt() == 0) {
+        if (item.second.use_count() == 1 && item.second->invalid()) {
             removed_handle.emplace_back(item.first);
         }
     }
@@ -94,7 +94,7 @@ void HdfsHandlerCache::_clean_oldest() {
     uint64_t oldest_time = ULONG_MAX;
     uint64 oldest = 0;
     for (auto& item : _cache) {
-        if (item.second->ref_cnt() == 0 && item.second->last_access_time() < 
oldest_time) {
+        if (item.second.use_count() == 1 && item.second->last_access_time() < 
oldest_time) {
             oldest_time = item.second->last_access_time();
             oldest = item.first;
         }
@@ -103,16 +103,16 @@ void HdfsHandlerCache::_clean_oldest() {
 }
 
 Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const 
std::string& fs_name,
-                                        HdfsHandler** fs_handle) {
+                                        std::shared_ptr<HdfsHandler>* 
fs_handle) {
     uint64 hash_code = hdfs_hash_code(hdfs_params);
     {
         std::lock_guard<std::mutex> l(_lock);
         auto it = _cache.find(hash_code);
         if (it != _cache.end()) {
-            HdfsHandler* handle = it->second.get();
+            std::shared_ptr<HdfsHandler> handle = it->second;
             if (!handle->invalid()) {
-                handle->inc_ref();
-                *fs_handle = handle;
+                handle->update_last_access_time();
+                *fs_handle = std::move(handle);
                 return Status::OK();
             }
             // fs handle is invalid, erase it.
@@ -129,12 +129,12 @@ Status HdfsHandlerCache::get_connection(const 
THdfsParams& hdfs_params, const st
             _clean_oldest();
         }
         if (_cache.size() < MAX_CACHE_HANDLE) {
-            std::unique_ptr<HdfsHandler> handle = 
std::make_unique<HdfsHandler>(hdfs_fs, true);
-            handle->inc_ref();
-            *fs_handle = handle.get();
+            auto handle = std::make_shared<HdfsHandler>(hdfs_fs, true);
+            handle->update_last_access_time();
+            *fs_handle = handle;
             _cache[hash_code] = std::move(handle);
         } else {
-            *fs_handle = new HdfsHandler(hdfs_fs, false);
+            *fs_handle = std::make_shared<HdfsHandler>(hdfs_fs, false);
         }
     }
     return Status::OK();
diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h
index f450063c7dc..c8d25d1f30a 100644
--- a/be/src/io/hdfs_util.h
+++ b/be/src/io/hdfs_util.h
@@ -54,7 +54,6 @@ public:
     HdfsHandler(hdfsFS fs, bool cached)
             : hdfs_fs(fs),
               from_cache(cached),
-              _ref_cnt(0),
               
_create_time(std::chrono::duration_cast<std::chrono::milliseconds>(
                                    
std::chrono::system_clock::now().time_since_epoch())
                                    .count()),
@@ -62,7 +61,6 @@ public:
               _invalid(false) {}
 
     ~HdfsHandler() {
-        DCHECK(_ref_cnt == 0);
         if (hdfs_fs != nullptr) {
             // DO NOT call hdfsDisconnect(), or we will meet "Filesystem 
closed"
             // even if we create a new one
@@ -73,17 +71,14 @@ public:
 
     int64_t last_access_time() { return _last_access_time; }
 
-    void inc_ref() {
-        _ref_cnt++;
-        _last_access_time = 
std::chrono::duration_cast<std::chrono::milliseconds>(
-                                    
std::chrono::system_clock::now().time_since_epoch())
-                                    .count();
+    void update_last_access_time() {
+        if (from_cache) {
+            _last_access_time = 
std::chrono::duration_cast<std::chrono::milliseconds>(
+                                        
std::chrono::system_clock::now().time_since_epoch())
+                                        .count();
+        }
     }
 
-    void dec_ref() { _ref_cnt--; }
-
-    int ref_cnt() { return _ref_cnt; }
-
     bool invalid() { return _invalid; }
 
     void set_invalid() { _invalid = true; }
@@ -94,8 +89,6 @@ public:
     const bool from_cache;
 
 private:
-    // the number of referenced client
-    std::atomic<int> _ref_cnt;
     // For kerberos authentication, we need to save create time so that
     // we can know if the kerberos ticket is expired.
     std::atomic<uint64_t> _create_time;
@@ -118,13 +111,13 @@ public:
 
     // This function is thread-safe
     Status get_connection(const THdfsParams& hdfs_params, const std::string& 
fs_name,
-                          HdfsHandler** fs_handle);
+                          std::shared_ptr<HdfsHandler>* fs_handle);
 
 private:
     static constexpr int MAX_CACHE_HANDLE = 64;
 
     std::mutex _lock;
-    std::unordered_map<uint64_t, std::unique_ptr<HdfsHandler>> _cache;
+    std::unordered_map<uint64_t, std::shared_ptr<HdfsHandler>> _cache;
 
     HdfsHandlerCache() = default;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to