xy720 commented on code in PR #43642:
URL: https://github.com/apache/doris/pull/43642#discussion_r1860346804
##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type
type, const std::string& l
SnapshotLoader::~SnapshotLoader() = default;
+static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
+ const std::string& dir, const
std::string& rowset,
+ std::vector<std::string>*
remote_files) {
+ bool exists = true;
+ std::vector<io::FileInfo> files;
+ RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
+ for (auto& tmp_file : files) {
+ io::Path path(tmp_file.file_name);
+ std::string file_name = path.filename();
+
+ if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
+ !_end_with(file_name, ".idx")) {
+ continue;
+ }
+ remote_files->push_back(file_name);
+ }
+
+ return Status::OK();
+}
+
+static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs,
+ io::RemoteFileSystem* cold_fs,
+ const std::string& remote_seg_path,
+ const std::string& local_seg_path,
+ const std::string& dest_seg_path) {
+ RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));
+
+ // calc md5sum of localfile
+ std::string md5sum;
+ RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path,
&md5sum));
+
+ RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path,
dest_seg_path, md5sum));
+
+ //delete local file
+
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));
+
+ return Status::OK();
+}
+
+static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
+ const std::string& local_path, const
std::string& dest_path,
+ io::RemoteFileSystem* cold_fs, const
std::string& rowset,
+ int segments, int have_inverted_index) {
+ Status res = Status::OK();
+
+ std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX,
tablet_id);
+
+ for (int i = 0; i < segments; i++) {
+ std::string remote_seg_path = fmt::format("{}/{}_{}.dat",
remote_tablet_path, rowset, i);
+ std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path,
rowset, i);
+ std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path,
rowset, i);
+
+ RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs,
remote_seg_path,
+ local_seg_path,
dest_seg_path));
+ }
+
+ if (!have_inverted_index) {
+ return res;
+ }
+
+ std::vector<std::string> remote_index_files;
+ RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs,
remote_tablet_path, rowset,
+ &remote_index_files));
+
+ for (auto& index_file : remote_index_files) {
+ std::string remote_index_path = fmt::format("{}/{}",
remote_tablet_path, index_file);
+ std::string local_seg_path = fmt::format("{}/{}", local_path,
index_file);
+ std::string dest_seg_path = fmt::format("{}/{}", dest_path,
index_file);
+
+ RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs,
remote_index_path,
+ local_seg_path,
dest_seg_path));
+ }
+ return res;
+}
+
+static Status upload_remote_file(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
+ const std::string& local_path, const
std::string& dest_path,
+ const std::string& remote_file) {
+ io::FileReaderSPtr file_reader;
+ Status res = Status::OK();
+
+ std::string full_remote_path = local_path + '/' + remote_file;
Review Comment:
full_remote_path is confusing
##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type
type, const std::string& l
SnapshotLoader::~SnapshotLoader() = default;
+static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
+ const std::string& dir, const
std::string& rowset,
+ std::vector<std::string>*
remote_files) {
+ bool exists = true;
+ std::vector<io::FileInfo> files;
+ RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
+ for (auto& tmp_file : files) {
+ io::Path path(tmp_file.file_name);
+ std::string file_name = path.filename();
+
+ if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
+ !_end_with(file_name, ".idx")) {
+ continue;
+ }
+ remote_files->push_back(file_name);
+ }
+
+ return Status::OK();
+}
+
+static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs,
+ io::RemoteFileSystem* cold_fs,
+ const std::string& remote_seg_path,
+ const std::string& local_seg_path,
+ const std::string& dest_seg_path) {
+ RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));
+
+ // calc md5sum of localfile
+ std::string md5sum;
+ RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path,
&md5sum));
+
+ RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path,
dest_seg_path, md5sum));
+
+ //delete local file
+
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));
+
+ return Status::OK();
+}
+
+static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
+ const std::string& local_path, const
std::string& dest_path,
+ io::RemoteFileSystem* cold_fs, const
std::string& rowset,
+ int segments, int have_inverted_index) {
+ Status res = Status::OK();
+
+ std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX,
tablet_id);
+
+ for (int i = 0; i < segments; i++) {
+ std::string remote_seg_path = fmt::format("{}/{}_{}.dat",
remote_tablet_path, rowset, i);
+ std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path,
rowset, i);
+ std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path,
rowset, i);
+
+ RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs,
remote_seg_path,
+ local_seg_path,
dest_seg_path));
+ }
+
+ if (!have_inverted_index) {
+ return res;
+ }
+
+ std::vector<std::string> remote_index_files;
+ RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs,
remote_tablet_path, rowset,
+ &remote_index_files));
+
+ for (auto& index_file : remote_index_files) {
+ std::string remote_index_path = fmt::format("{}/{}",
remote_tablet_path, index_file);
+ std::string local_seg_path = fmt::format("{}/{}", local_path,
index_file);
+ std::string dest_seg_path = fmt::format("{}/{}", dest_path,
index_file);
+
+ RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs,
remote_index_path,
+ local_seg_path,
dest_seg_path));
+ }
+ return res;
+}
+
+static Status upload_remote_file(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
+ const std::string& local_path, const
std::string& dest_path,
+ const std::string& remote_file) {
Review Comment:
remote_file is confusing, may be local_remote_info_file_path?
##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -168,7 +300,10 @@ Status SnapshotLoader::upload(const std::map<std::string,
std::string>& src_to_d
for (auto& local_file : local_files) {
RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num,
total_num,
TTaskType::type::UPLOAD));
-
+ if (local_file.compare("remote_file_info") == 0) {
+ RETURN_IF_ERROR(upload_remote_file(*_remote_fs, tablet_id,
src_path, dest_path,
Review Comment:
Here it upload the whole tablet's files in one function, why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]