xy720 commented on code in PR #43642:
URL: https://github.com/apache/doris/pull/43642#discussion_r1849846090
##########
fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java:
##########
@@ -212,6 +226,27 @@ public void analyzeProperties() throws AnalysisException {
// is atomic restore
isAtomicRestore = eatBooleanProperty(copiedProperties,
PROP_ATOMIC_RESTORE, isAtomicRestore);
+ if (copiedProperties.containsKey(PROP_STORAGE_RESOURCE)) {
+ storageResource = copiedProperties.get(PROP_STORAGE_RESOURCE);
+ Resource localResource =
Env.getCurrentEnv().getResourceMgr().getResource(storageResource);
+
+ if (localResource == null) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
+ "Restore storage resource " + storageResource + " is
not exist");
+ }
+
+ if (localResource.getType() != Resource.ResourceType.S3) {
Review Comment:
How about the hdfs resource?
##########
fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java:
##########
@@ -52,18 +53,36 @@ public class BackupMeta implements Writable,
GsonPostProcessable {
// resource name -> resource
@SerializedName(value = "resourceNameMap")
private Map<String, Resource> resourceNameMap = Maps.newHashMap();
+ // storagePolicy name -> resource
+ @SerializedName(value = "storagePolicyNameMap")
+ private Map<String, StoragePolicy> storagePolicyNameMap =
Maps.newHashMap();
private BackupMeta() {
}
- public BackupMeta(List<Table> tables, List<Resource> resources) {
+ public BackupMeta(List<Table> tables, List<Resource> resources,
List<StoragePolicy> storagePolicys) {
Review Comment:
```suggestion
public BackupMeta(List<Table> tables, List<Resource> resources,
List<StoragePolicy> storagePolicies) {
```
##########
fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java:
##########
@@ -665,6 +686,32 @@ private void checkAndPrepareMeta() {
}
}
+ for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo :
jobInfo.newBackupObjects.s3Resources) {
+ Resource resource =
Env.getCurrentEnv().getResourceMgr().getResource(storageResource != null
+ ? storageResource : backupS3ResourceInfo.name);
+ if (resource == null) {
+ continue;
+ }
+ if (resource.getType() != Resource.ResourceType.S3) {
Review Comment:
How about the backup resource type?
##########
fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java:
##########
@@ -1253,14 +1318,114 @@ private void checkAndRestoreResources() {
} else {
try {
// restore resource
- resourceMgr.createResource(remoteOdbcResource, false);
+ resourceMgr.createResource(remoteOdbcResource);
} catch (DdlException e) {
status = new Status(ErrCode.COMMON_ERROR, e.getMessage());
return;
}
restoredResources.add(remoteOdbcResource);
}
}
+
+ if (!reserveStoragePolicy) {
+ return;
+ }
+
+ for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo :
jobInfo.newBackupObjects.s3Resources) {
+ String backupResourceName = backupS3ResourceInfo.name;
+ Resource localResource = resourceMgr.getResource(storageResource
!= null
Review Comment:
There should be multiple local resources for each partition
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java:
##########
@@ -429,15 +429,17 @@ public void resetPartitionIdForRestore(
idToStoragePolicy = Maps.newHashMap();
for (Map.Entry<Long, Long> entry : partitionIdMap.entrySet()) {
- idToDataProperty.put(entry.getKey(),
origIdToDataProperty.get(entry.getValue()));
+ idToDataProperty.put(entry.getKey(), reserveStoragePolicy
+ ? origIdToDataProperty.get(entry.getValue()) :
DataProperty.DEFAULT_HDD_DATA_PROPERTY);
Review Comment:
What if there is no HDD medium?
##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -596,6 +647,9 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
new_tablet_meta->revise_delete_bitmap_unlocked(delete_bitmap_snapshot);
}
+ //clear cooldown meta
+ new_tablet_meta->revise_clear_resource_id();
Review Comment:
This resource id should be refresh in restore convert_rowset_ids?
##########
be/src/olap/rowset/beta_rowset.cpp:
##########
@@ -432,13 +432,97 @@ Status BetaRowset::copy_files_to(const std::string& dir,
const RowsetId& new_row
return Status::OK();
}
+Status BetaRowset::download(const StorageResource& dest_fs, const std::string&
dir) {
+ if (is_local()) {
+ DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id();
+ return Status::InternalError("should be remote rowset. tablet_id={}
rowset_id={}",
+ _rowset_meta->tablet_id(),
rowset_id().to_string());
+ }
+
+ if (num_segments() < 1) {
+ return Status::OK();
+ }
+
+ Status status;
+ std::vector<string> linked_success_files;
+ Defer remove_linked_files {[&]() { // clear download files if errors happen
Review Comment:
Using linked_files as name is confusing because there is no link operation
here
##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -566,19 +569,67 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
}
std::vector<RowsetMetaSharedPtr> rs_metas;
+ RowsetMetaSharedPtr rsm;
Review Comment:
Please add some comments
##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type
type, const std::string& l
SnapshotLoader::~SnapshotLoader() = default;
+static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
+ const std::string& dir, const
std::string& rowset,
+ std::vector<std::string>*
remote_files) {
+ bool exists = true;
+ std::vector<io::FileInfo> files;
+ RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
+ for (auto& tmp_file : files) {
+ io::Path path(tmp_file.file_name);
+ std::string file_name = path.filename();
+
+ if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
+ !_end_with(file_name, ".idx")) {
+ continue;
+ }
+ remote_files->push_back(file_name);
+ }
+
+ return Status::OK();
+}
+
+static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs,
+ io::RemoteFileSystem* cold_fs,
+ const std::string& remote_seg_path,
+ const std::string& local_seg_path,
+ const std::string& dest_seg_path) {
+ RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));
+
+ // calc md5sum of localfile
+ std::string md5sum;
+ RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path,
&md5sum));
+
+ RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path,
dest_seg_path, md5sum));
+
+ //delete local file
+
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));
+
+ return Status::OK();
+}
+
+static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
+ const std::string& local_path, const
std::string& dest_path,
+ io::RemoteFileSystem* cold_fs, const
std::string& rowset,
+ int segments, int have_inverted_index) {
+ Status res = Status::OK();
+
+ std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX,
tablet_id);
+
+ for (int i = 0; i < segments; i++) {
+ std::string remote_seg_path = fmt::format("{}/{}_{}.dat",
remote_tablet_path, rowset, i);
+ std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path,
rowset, i);
+ std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path,
rowset, i);
+
+ RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs,
remote_seg_path,
+ local_seg_path,
dest_seg_path));
+ }
+
+ if (!have_inverted_index) {
+ return res;
+ }
+
+ std::vector<std::string> remote_index_files;
+ RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs,
remote_tablet_path, rowset,
+ &remote_index_files));
+
+ for (auto& index_file : remote_index_files) {
+ std::string remote_index_path = fmt::format("{}/{}",
remote_tablet_path, index_file);
+ std::string local_seg_path = fmt::format("{}/{}", local_path,
index_file);
+ std::string dest_seg_path = fmt::format("{}/{}", dest_path,
index_file);
+
+ RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs,
remote_index_path,
+ local_seg_path,
dest_seg_path));
+ }
+ return res;
+}
+
+static Status upload_remote_file(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
+ const std::string& local_path, const
std::string& dest_path,
+ const std::string& remote_file) {
+ io::FileReaderSPtr file_reader;
+ Status res = Status::OK();
+
+ std::string full_remote_path = local_path + '/' + remote_file;
Review Comment:
Here I suggest use the proto/json format remote file info, easy to modify
and serialize.Besides, you can name it some like tablet_id.remote_file_info,
this file is in tablet's local snapshot dir.
##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -754,9 +894,9 @@ Status SnapshotLoader::move(const std::string&
snapshot_path, TabletSharedPtr ta
}
// rename the rowset ids and tabletid info in rowset meta
- auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path,
tablet_id,
-
tablet->replica_id(), tablet->table_id(),
-
tablet->partition_id(), schema_hash);
Review Comment:
Here we should refresh the storage policy id and resource id, this two ids
are determined by FE in the pending state of restore job.
##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -170,6 +171,7 @@ Result<std::vector<PendingRowsetGuard>>
SnapshotManager::convert_rowset_ids(
new_tablet_meta_pb.set_tablet_id(tablet_id);
*new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto();
new_tablet_meta_pb.set_replica_id(replica_id);
+ new_tablet_meta_pb.set_storage_policy_id(storage_policy_id);
Review Comment:
```suggestion
if (storage_policy_id > 0) {
new_tablet_meta_pb.set_storage_policy_id(storage_policy_id);
}
```
##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type
type, const std::string& l
SnapshotLoader::~SnapshotLoader() = default;
+static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
+ const std::string& dir, const
std::string& rowset,
+ std::vector<std::string>*
remote_files) {
+ bool exists = true;
+ std::vector<io::FileInfo> files;
+ RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
+ for (auto& tmp_file : files) {
+ io::Path path(tmp_file.file_name);
+ std::string file_name = path.filename();
+
+ if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
+ !_end_with(file_name, ".idx")) {
+ continue;
+ }
+ remote_files->push_back(file_name);
+ }
+
+ return Status::OK();
+}
+
+static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs,
+ io::RemoteFileSystem* cold_fs,
+ const std::string& remote_seg_path,
+ const std::string& local_seg_path,
+ const std::string& dest_seg_path) {
+ RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));
+
+ // calc md5sum of localfile
+ std::string md5sum;
+ RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path,
&md5sum));
+
+ RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path,
dest_seg_path, md5sum));
+
+ //delete local file
+
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));
+
+ return Status::OK();
+}
+
+static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
+ const std::string& local_path, const
std::string& dest_path,
+ io::RemoteFileSystem* cold_fs, const
std::string& rowset,
Review Comment:
```suggestion
io::RemoteFileSystem* cold_fs, const
std::string& rowset_id,
```
##########
be/src/olap/tablet.cpp:
##########
@@ -2002,6 +2002,17 @@ Status Tablet::cooldown(RowsetSharedPtr rowset) {
return Status::OK();
}
+Status Tablet::download(RowsetSharedPtr rowset, const std::string& dir) {
Review Comment:
This function doesn't seem to be used
##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type
type, const std::string& l
SnapshotLoader::~SnapshotLoader() = default;
+static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
+ const std::string& dir, const
std::string& rowset,
+ std::vector<std::string>*
remote_files) {
+ bool exists = true;
+ std::vector<io::FileInfo> files;
+ RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
+ for (auto& tmp_file : files) {
+ io::Path path(tmp_file.file_name);
+ std::string file_name = path.filename();
+
+ if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
+ !_end_with(file_name, ".idx")) {
+ continue;
+ }
+ remote_files->push_back(file_name);
+ }
+
+ return Status::OK();
+}
+
+static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs,
+ io::RemoteFileSystem* cold_fs,
+ const std::string& remote_seg_path,
+ const std::string& local_seg_path,
+ const std::string& dest_seg_path) {
+ RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));
+
+ // calc md5sum of localfile
+ std::string md5sum;
+ RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path,
&md5sum));
+
+ RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path,
dest_seg_path, md5sum));
+
+ //delete local file
+
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));
+
+ return Status::OK();
+}
+
+static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
+ const std::string& local_path, const
std::string& dest_path,
+ io::RemoteFileSystem* cold_fs, const
std::string& rowset,
+ int segments, int have_inverted_index) {
+ Status res = Status::OK();
+
+ std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX,
tablet_id);
+
+ for (int i = 0; i < segments; i++) {
+ std::string remote_seg_path = fmt::format("{}/{}_{}.dat",
remote_tablet_path, rowset, i);
+ std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path,
rowset, i);
+ std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path,
rowset, i);
+
+ RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs,
remote_seg_path,
+ local_seg_path,
dest_seg_path));
+ }
+
+ if (!have_inverted_index) {
+ return res;
+ }
+
+ std::vector<std::string> remote_index_files;
+ RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs,
remote_tablet_path, rowset,
+ &remote_index_files));
+
+ for (auto& index_file : remote_index_files) {
+ std::string remote_index_path = fmt::format("{}/{}",
remote_tablet_path, index_file);
+ std::string local_seg_path = fmt::format("{}/{}", local_path,
index_file);
+ std::string dest_seg_path = fmt::format("{}/{}", dest_path,
index_file);
+
+ RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs,
remote_index_path,
+ local_seg_path,
dest_seg_path));
+ }
+ return res;
+}
+
+static Status upload_remote_file(io::RemoteFileSystem& dest_fs, int64_t
tablet_id,
Review Comment:
Please add comments to explain this function.
##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -566,19 +569,67 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
}
std::vector<RowsetMetaSharedPtr> rs_metas;
+ RowsetMetaSharedPtr rsm;
+ bool have_remote_file = false;
+ io::FileWriterPtr file_writer;
+
for (auto& rs : consistent_rowsets) {
if (rs->is_local()) {
// local rowset
res = rs->link_files_to(schema_full_path, rs->rowset_id());
if (!res.ok()) {
break;
}
+ rsm = rs->rowset_meta();
+ } else {
+ std::string rowset_meta_str;
+ RowsetMetaPB rs_meta_pb;
+ rs->rowset_meta()->to_rowset_pb(&rs_meta_pb);
+ rs_meta_pb.SerializeToString(&rowset_meta_str);
+
+ RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
+ rowset_meta->init(rowset_meta_str);
+
+ rsm = rowset_meta;
+
+ // save_remote_file info
Review Comment:
I suggest proto format remote_file_info
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java:
##########
@@ -429,15 +429,17 @@ public void resetPartitionIdForRestore(
idToStoragePolicy = Maps.newHashMap();
for (Map.Entry<Long, Long> entry : partitionIdMap.entrySet()) {
- idToDataProperty.put(entry.getKey(),
origIdToDataProperty.get(entry.getValue()));
+ idToDataProperty.put(entry.getKey(), reserveStoragePolicy
+ ? origIdToDataProperty.get(entry.getValue()) :
DataProperty.DEFAULT_HDD_DATA_PROPERTY);
Review Comment:
If reserveStoragePolicy is false, here we should reset the storage policy
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]