This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d42d83939 fix(bulk_load): fix core dump while downloading sst files 
(#2077)
d42d83939 is described below

commit d42d839393059d43472c4ddc47745cb5e4347a47
Author: Pengfan Lu <[email protected]>
AuthorDate: Mon Mar 10 16:12:30 2025 +0800

    fix(bulk_load): fix core dump while downloading sst files (#2077)
    
    Fix https://github.com/apache/incubator-pegasus/issues/2006.
    
    Fix core dump by using read lock and avoid referencing `_metadata.files`.
---
 src/replica/bulk_load/replica_bulk_loader.cpp | 52 +++++++++++++++------------
 src/replica/bulk_load/replica_bulk_loader.h   |  3 +-
 2 files changed, 32 insertions(+), 23 deletions(-)

diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp 
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 8b876b420..7aed63fe8 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <fmt/core.h>
+#include <algorithm>
 #include <functional>
 #include <memory>
 #include <string_view>
@@ -519,22 +520,32 @@ void replica_bulk_loader::download_files(const 
std::string &provider_name,
     }
 
     // download sst files asynchronously
-    if (!_metadata.files.empty()) {
-        const file_meta &f_meta = _metadata.files[0];
-        _download_files_task[f_meta.name] = tasking::enqueue(
-            LPC_BACKGROUND_BULK_LOAD,
-            tracker(),
-            std::bind(&replica_bulk_loader::download_sst_file, this, 
remote_dir, local_dir, 0, fs));
+    {
+        zauto_read_lock l(_lock);
+        if (!_metadata.files.empty()) {
+            _download_files_task[_metadata.files.back().name] = 
tasking::enqueue(
+                LPC_BACKGROUND_BULK_LOAD,
+                tracker(),
+                [this, remote_dir, local_dir, file_meta = _metadata.files, 
fs]() mutable {
+                    this->download_sst_file(remote_dir, local_dir, 
std::move(file_meta), fs);
+                });
+        }
     }
 }
 
 // ThreadPool: THREAD_POOL_DEFAULT
-void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
-                                            const std::string &local_dir,
-                                            int32_t file_index,
-                                            
dist::block_service::block_filesystem *fs)
+void replica_bulk_loader::download_sst_file(
+    const std::string &remote_dir,
+    const std::string &local_dir,
+    std::vector<::dsn::replication::file_meta> &&download_file_metas,
+    dist::block_service::block_filesystem *fs)
 {
-    const file_meta &f_meta = _metadata.files[file_index];
+    if (_status != bulk_load_status::BLS_DOWNLOADING) {
+        LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load 
local_status is {}.",
+                           enum_to_string(_status));
+        return;
+    }
+    const file_meta &f_meta = download_file_metas.back();
     uint64_t f_size = 0;
     std::string f_md5;
     error_code ec = _stub->_block_service_manager.download_file(
@@ -590,17 +601,14 @@ void replica_bulk_loader::download_sst_file(const 
std::string &remote_dir,
     METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size);
 
     // download next file
-    if (file_index + 1 < _metadata.files.size()) {
-        const file_meta &next_f_meta = _metadata.files[file_index + 1];
-        _download_files_task[next_f_meta.name] =
-            tasking::enqueue(LPC_BACKGROUND_BULK_LOAD,
-                             tracker(),
-                             std::bind(&replica_bulk_loader::download_sst_file,
-                                       this,
-                                       remote_dir,
-                                       local_dir,
-                                       file_index + 1,
-                                       fs));
+    download_file_metas.pop_back();
+    if (!download_file_metas.empty()) {
+        _download_files_task[download_file_metas.back().name] = 
tasking::enqueue(
+            LPC_BACKGROUND_BULK_LOAD,
+            tracker(),
+            [this, remote_dir, local_dir, download_file_metas, fs]() mutable {
+                this->download_sst_file(remote_dir, local_dir, 
std::move(download_file_metas), fs);
+            });
     }
 }
 
diff --git a/src/replica/bulk_load/replica_bulk_loader.h 
b/src/replica/bulk_load/replica_bulk_loader.h
index e01285e54..f53e5b69a 100644
--- a/src/replica/bulk_load/replica_bulk_loader.h
+++ b/src/replica/bulk_load/replica_bulk_loader.h
@@ -22,6 +22,7 @@
 #include <map>
 #include <ostream>
 #include <string>
+#include <vector>
 
 #include "bulk_load_types.h"
 #include "common/replication_other_types.h"
@@ -89,7 +90,7 @@ private:
     // download sst files from remote provider
     void download_sst_file(const std::string &remote_dir,
                            const std::string &local_dir,
-                           int32_t file_index,
+                           std::vector<::dsn::replication::file_meta> 
&&download_file_metas,
                            dist::block_service::block_filesystem *fs);
 
     // \return ERR_PATH_NOT_FOUND: file not exist


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to