This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 35d0ed998cf699f59f71b82e2589d7d659a5ccb3 Author: Qi Chen <[email protected]> AuthorDate: Tue Apr 23 19:18:39 2024 +0800 [Fix](hdfs-writer) Fix hdfs file writer core with `check failed: _ref_cnt == 0` in dtor of `HdfsFileWriter`. (#33959) ## Issue: ``` F20240421 17:14:37.494115 184986 hdfs_util.h:65] Check failed: _ref_cnt == 0 *** Check failure stack trace: *** F20240421 17:14:37.505879 185108 hdfs_util.h:65] Check failed: _ref_cnt == 0 *** Check failure stack trace: *** @ 0x556f5236d316 google::LogMessageFatal::~LogMessageFatal() @ 0x556f5236d316 google::LogMessageFatal::~LogMessageFatal() @ 0x556f2830e200 doris::io::HdfsFileWriter::~HdfsFileWriter() @ 0x556f2830e200 doris::io::HdfsFileWriter::~HdfsFileWriter() @ 0x556f2830e21e doris::io::HdfsFileWriter::~HdfsFileWriter() @ 0x556f2830e21e doris::io::HdfsFileWriter::~HdfsFileWriter() @ 0x556f507893b0 doris::vectorized::VHivePartitionWriter::~VHivePartitionWriter() @ 0x556f507893b0 doris::vectorized::VHivePartitionWriter::~VHivePartitionWriter() @ 0x556f506c005e std::_Hashtable<>::clear() @ 0x556f506c005e std::_Hashtable<>::clear() @ 0x556f50780f4f doris::vectorized::VHiveTableWriter::close() @ 0x556f50780f4f doris::vectorized::VHiveTableWriter::close() @ 0x556f5072bc4f doris::vectorized::AsyncResultWriter::process_block() @ 0x556f5072bc4f doris::vectorized::AsyncResultWriter::process_block() @ 0x556f5072cd01 std::_Function_handler<>::_M_invoke() @ 0x556f5072cd01 std::_Function_handler<>::_M_invoke() @ 0x556f2b02b73d doris::ThreadPool::dispatch_thread() @ 0x556f2b02b73d doris::ThreadPool::dispatch_thread() @ 0x556f2b008d59 doris::Thread::supervise_thread() @ 0x556f2b008d59 doris::Thread::supervise_thread() @ 0x7f2c2bfb4609 start_thread @ 0x7f2c2bfb4609 start_thread @ 0x7f2c2c261133 clone @ 0x7f2c2c261133 clone @ (nil) (unknown) *** Query id: ac4f457c003d4489-b04ac56ef05b12f0 *** *** is nereids: 1 *** *** tablet id: 0 *** *** Aborted at 1713690877 (unix time) try "date -d @1713690877" if you are using GNU date *** *** Current BE git commitID: e6f4a2f *** *** SIGABRT unknown detail explain (@0x38a6) received by PID 14502 (TID 184986 OR 0x7f21d0614700) from PID 14502; stack trace: *** @ (nil) (unknown) F20240421 17:14:37.505879 185108 hdfs_util.h:65] Check failed: _ref_cnt == 0 F20240421 17:14:37.887202 185110 hdfs_util.h:65] Check failed: _ref_cnt == 0 *** Check failure stack trace: *** @ 0x556f5236d316 google::LogMessageFatal::~LogMessageFatal() @ 0x556f2830e200 doris::io::HdfsFileWriter::~HdfsFileWriter() @ 0x556f2830e21e doris::io::HdfsFileWriter::~HdfsFileWriter() @ 0x556f507893b0 doris::vectorized::VHivePartitionWriter::~VHivePartitionWriter() 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /root/doris/be/src/common/signal_handler.h:421 ``` The root cause is When it cannot be processed in the cache (such as when the cache is full), we will create a `fs_handler`, and the life cycle of `fs_handler` is managed by caller. We have separated the hdfs writer and can create `fs_handler` separately, so `HdfsFileSystem` and `HdfsFileWriter` may be callers. In `HdfsFileSystem`, the `fs_handler` of `HdfsFileWriter` is shared, so it needs to be changed to `shared_ptr`. ### Solution Fix hdfs file writer core with `check failed: _ref_cnt == 0` in dtor of `HdfsFileWriter`. Change `fs_handler` ptr to `shared_ptr` and remove ref count operations. --- be/src/io/file_factory.cpp | 10 +++------- be/src/io/fs/hdfs_file_system.cpp | 12 +----------- be/src/io/fs/hdfs_file_system.h | 4 +--- be/src/io/fs/hdfs_file_writer.cpp | 11 +++-------- be/src/io/fs/hdfs_file_writer.h | 9 +++++---- be/src/io/hdfs_util.cpp | 20 ++++++++++---------- be/src/io/hdfs_util.h | 23 ++++++++--------------- 7 files changed, 31 insertions(+), 58 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 167fcf31b76..d46e94db799 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -123,14 +123,10 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer( } case TFileType::FILE_HDFS: { THdfsParams hdfs_params = parse_properties(properties); - io::HdfsHandler* handler; + std::shared_ptr<io::HdfsHandler> handler; RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection( hdfs_params, hdfs_params.fs_name, &handler)); - auto res = io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options); - if (!res.has_value()) { - handler->dec_ref(); - } - return res; + return io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options); } default: return ResultError( @@ -165,7 +161,7 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader( }); } case TFileType::FILE_HDFS: { - io::HdfsHandler* handler; + std::shared_ptr<io::HdfsHandler> handler; // FIXME(plat1ko): Explain the difference between `system_properties.hdfs_params.fs_name` // and `file_description.fs_name`, it's so confused. const auto* fs_name = &file_description.fs_name; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index f86b3ef588a..24976f11941 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -86,15 +86,7 @@ HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_na } } -HdfsFileSystem::~HdfsFileSystem() { - if (_fs_handle != nullptr) { - if (_fs_handle->from_cache) { - _fs_handle->dec_ref(); - } else { - delete _fs_handle; - } - } -} +HdfsFileSystem::~HdfsFileSystem() = default; Status HdfsFileSystem::init() { RETURN_IF_ERROR( @@ -107,13 +99,11 @@ Status HdfsFileSystem::init() { Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, const FileWriterOptions* opts) { - _fs_handle->inc_ref(); auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name, opts); if (res.has_value()) { *writer = std::move(res).value(); return Status::OK(); } else { - _fs_handle->dec_ref(); return std::move(res).error(); } } diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index 23ae65b0820..f7c9ea22315 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -88,9 +88,7 @@ private: RuntimeProfile* profile, std::string root_path); const THdfsParams& _hdfs_params; // Only used in init, so we can use reference here std::string _fs_name; - // do not use std::shared_ptr or std::unique_ptr - // _fs_handle is managed by HdfsFileSystemCache - HdfsHandler* _fs_handle = nullptr; + std::shared_ptr<HdfsHandler> _fs_handle = nullptr; RuntimeProfile* _profile = nullptr; }; } // namespace io diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index b2ea4dace94..d15745ebf37 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -46,10 +46,10 @@ bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_writt static constexpr size_t MB = 1024 * 1024; -HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, +HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file, std::string fs_name, const FileWriterOptions* opts) : _path(std::move(path)), - _hdfs_handler(handler), + _hdfs_handler(std::move(handler)), _hdfs_file(hdfs_file), _fs_name(std::move(fs_name)), _sync_file_data(opts ? opts->sync_file_data : true), @@ -71,11 +71,6 @@ HdfsFileWriter::~HdfsFileWriter() { hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); } - if (_hdfs_handler->from_cache) { - _hdfs_handler->dec_ref(); - } else { - delete _hdfs_handler; - } hdfs_file_being_written << -1; } @@ -282,7 +277,7 @@ Status HdfsFileWriter::finalize() { return Status::OK(); } -Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handler, +Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, std::shared_ptr<HdfsHandler> handler, const std::string& fs_name, const FileWriterOptions* opts) { auto path = convert_path(full_path, fs_name); diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index e6aa623cada..f6fa66c5dbe 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -36,11 +36,12 @@ public: // - fs_name/path_to_file // - /path_to_file // TODO(plat1ko): Support related path for cloud mode - static Result<FileWriterPtr> create(Path path, HdfsHandler* handler, const std::string& fs_name, + static Result<FileWriterPtr> create(Path path, std::shared_ptr<HdfsHandler> handler, + const std::string& fs_name, const FileWriterOptions* opts = nullptr); - HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name, - const FileWriterOptions* opts = nullptr); + HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file, + std::string fs_name, const FileWriterOptions* opts = nullptr); ~HdfsFileWriter() override; Status close() override; @@ -59,7 +60,7 @@ private: Status _append(std::string_view content); Path _path; - HdfsHandler* _hdfs_handler = nullptr; + std::shared_ptr<HdfsHandler> _hdfs_handler = nullptr; hdfsFile _hdfs_file = nullptr; std::string _fs_name; size_t _bytes_appended = 0; diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp index 0ae5d2f371b..a9563a82139 100644 --- a/be/src/io/hdfs_util.cpp +++ b/be/src/io/hdfs_util.cpp @@ -81,7 +81,7 @@ bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync"); void HdfsHandlerCache::_clean_invalid() { std::vector<uint64> removed_handle; for (auto& item : _cache) { - if (item.second->invalid() && item.second->ref_cnt() == 0) { + if (item.second.use_count() == 1 && item.second->invalid()) { removed_handle.emplace_back(item.first); } } @@ -94,7 +94,7 @@ void HdfsHandlerCache::_clean_oldest() { uint64_t oldest_time = ULONG_MAX; uint64 oldest = 0; for (auto& item : _cache) { - if (item.second->ref_cnt() == 0 && item.second->last_access_time() < oldest_time) { + if (item.second.use_count() == 1 && item.second->last_access_time() < oldest_time) { oldest_time = item.second->last_access_time(); oldest = item.first; } @@ -103,16 +103,16 @@ void HdfsHandlerCache::_clean_oldest() { } Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, - HdfsHandler** fs_handle) { + std::shared_ptr<HdfsHandler>* fs_handle) { uint64 hash_code = hdfs_hash_code(hdfs_params); { std::lock_guard<std::mutex> l(_lock); auto it = _cache.find(hash_code); if (it != _cache.end()) { - HdfsHandler* handle = it->second.get(); + std::shared_ptr<HdfsHandler> handle = it->second; if (!handle->invalid()) { - handle->inc_ref(); - *fs_handle = handle; + handle->update_last_access_time(); + *fs_handle = std::move(handle); return Status::OK(); } // fs handle is invalid, erase it. @@ -129,12 +129,12 @@ Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const st _clean_oldest(); } if (_cache.size() < MAX_CACHE_HANDLE) { - std::unique_ptr<HdfsHandler> handle = std::make_unique<HdfsHandler>(hdfs_fs, true); - handle->inc_ref(); - *fs_handle = handle.get(); + auto handle = std::make_shared<HdfsHandler>(hdfs_fs, true); + handle->update_last_access_time(); + *fs_handle = handle; _cache[hash_code] = std::move(handle); } else { - *fs_handle = new HdfsHandler(hdfs_fs, false); + *fs_handle = std::make_shared<HdfsHandler>(hdfs_fs, false); } } return Status::OK(); diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h index f450063c7dc..c8d25d1f30a 100644 --- a/be/src/io/hdfs_util.h +++ b/be/src/io/hdfs_util.h @@ -54,7 +54,6 @@ public: HdfsHandler(hdfsFS fs, bool cached) : hdfs_fs(fs), from_cache(cached), - _ref_cnt(0), _create_time(std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch()) .count()), @@ -62,7 +61,6 @@ public: _invalid(false) {} ~HdfsHandler() { - DCHECK(_ref_cnt == 0); if (hdfs_fs != nullptr) { // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed" // even if we create a new one @@ -73,17 +71,14 @@ public: int64_t last_access_time() { return _last_access_time; } - void inc_ref() { - _ref_cnt++; - _last_access_time = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + void update_last_access_time() { + if (from_cache) { + _last_access_time = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } } - void dec_ref() { _ref_cnt--; } - - int ref_cnt() { return _ref_cnt; } - bool invalid() { return _invalid; } void set_invalid() { _invalid = true; } @@ -94,8 +89,6 @@ public: const bool from_cache; private: - // the number of referenced client - std::atomic<int> _ref_cnt; // For kerberos authentication, we need to save create time so that // we can know if the kerberos ticket is expired. std::atomic<uint64_t> _create_time; @@ -118,13 +111,13 @@ public: // This function is thread-safe Status get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, - HdfsHandler** fs_handle); + std::shared_ptr<HdfsHandler>* fs_handle); private: static constexpr int MAX_CACHE_HANDLE = 64; std::mutex _lock; - std::unordered_map<uint64_t, std::unique_ptr<HdfsHandler>> _cache; + std::unordered_map<uint64_t, std::shared_ptr<HdfsHandler>> _cache; HdfsHandlerCache() = default; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
