This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d42d83939 fix(bulk_load): fix core dump while downloading sst files
(#2077)
d42d83939 is described below
commit d42d839393059d43472c4ddc47745cb5e4347a47
Author: Pengfan Lu <[email protected]>
AuthorDate: Mon Mar 10 16:12:30 2025 +0800
fix(bulk_load): fix core dump while downloading sst files (#2077)
Fix https://github.com/apache/incubator-pegasus/issues/2006.
Fix core dump by using read lock and avoid referencing `_metadata.files`.
---
src/replica/bulk_load/replica_bulk_loader.cpp | 52 +++++++++++++++------------
src/replica/bulk_load/replica_bulk_loader.h | 3 +-
2 files changed, 32 insertions(+), 23 deletions(-)
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 8b876b420..7aed63fe8 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -16,6 +16,7 @@
// under the License.
#include <fmt/core.h>
+#include <algorithm>
#include <functional>
#include <memory>
#include <string_view>
@@ -519,22 +520,32 @@ void replica_bulk_loader::download_files(const
std::string &provider_name,
}
// download sst files asynchronously
- if (!_metadata.files.empty()) {
- const file_meta &f_meta = _metadata.files[0];
- _download_files_task[f_meta.name] = tasking::enqueue(
- LPC_BACKGROUND_BULK_LOAD,
- tracker(),
- std::bind(&replica_bulk_loader::download_sst_file, this,
remote_dir, local_dir, 0, fs));
+ {
+ zauto_read_lock l(_lock);
+ if (!_metadata.files.empty()) {
+ _download_files_task[_metadata.files.back().name] =
tasking::enqueue(
+ LPC_BACKGROUND_BULK_LOAD,
+ tracker(),
+ [this, remote_dir, local_dir, file_meta = _metadata.files,
fs]() mutable {
+ this->download_sst_file(remote_dir, local_dir,
std::move(file_meta), fs);
+ });
+ }
}
}
// ThreadPool: THREAD_POOL_DEFAULT
-void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
- const std::string &local_dir,
- int32_t file_index,
-
dist::block_service::block_filesystem *fs)
+void replica_bulk_loader::download_sst_file(
+ const std::string &remote_dir,
+ const std::string &local_dir,
+ std::vector<::dsn::replication::file_meta> &&download_file_metas,
+ dist::block_service::block_filesystem *fs)
{
- const file_meta &f_meta = _metadata.files[file_index];
+ if (_status != bulk_load_status::BLS_DOWNLOADING) {
+ LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load
local_status is {}.",
+ enum_to_string(_status));
+ return;
+ }
+ const file_meta &f_meta = download_file_metas.back();
uint64_t f_size = 0;
std::string f_md5;
error_code ec = _stub->_block_service_manager.download_file(
@@ -590,17 +601,14 @@ void replica_bulk_loader::download_sst_file(const
std::string &remote_dir,
METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size);
// download next file
- if (file_index + 1 < _metadata.files.size()) {
- const file_meta &next_f_meta = _metadata.files[file_index + 1];
- _download_files_task[next_f_meta.name] =
- tasking::enqueue(LPC_BACKGROUND_BULK_LOAD,
- tracker(),
- std::bind(&replica_bulk_loader::download_sst_file,
- this,
- remote_dir,
- local_dir,
- file_index + 1,
- fs));
+ download_file_metas.pop_back();
+ if (!download_file_metas.empty()) {
+ _download_files_task[download_file_metas.back().name] =
tasking::enqueue(
+ LPC_BACKGROUND_BULK_LOAD,
+ tracker(),
+ [this, remote_dir, local_dir, download_file_metas, fs]() mutable {
+ this->download_sst_file(remote_dir, local_dir,
std::move(download_file_metas), fs);
+ });
}
}
diff --git a/src/replica/bulk_load/replica_bulk_loader.h
b/src/replica/bulk_load/replica_bulk_loader.h
index e01285e54..f53e5b69a 100644
--- a/src/replica/bulk_load/replica_bulk_loader.h
+++ b/src/replica/bulk_load/replica_bulk_loader.h
@@ -22,6 +22,7 @@
#include <map>
#include <ostream>
#include <string>
+#include <vector>
#include "bulk_load_types.h"
#include "common/replication_other_types.h"
@@ -89,7 +90,7 @@ private:
// download sst files from remote provider
void download_sst_file(const std::string &remote_dir,
const std::string &local_dir,
- int32_t file_index,
+ std::vector<::dsn::replication::file_meta>
&&download_file_metas,
dist::block_service::block_filesystem *fs);
// \return ERR_PATH_NOT_FOUND: file not exist
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]