This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit 053c8c303f7047e67677eae1c0cc138f54913fb9 Author: Jack Drogon <[email protected]> AuthorDate: Wed Jun 7 21:35:15 2023 +0800 [feature](backup-restore) Add local backup/restore not upload/download by broker (#20492) --- be/src/agent/task_worker_pool.cpp | 28 ++- be/src/runtime/snapshot_loader.cpp | 269 +++++++++++++++++++++ be/src/runtime/snapshot_loader.h | 5 + be/src/service/backend_service.cpp | 14 +- .../org/apache/doris/analysis/RestoreStmt.java | 32 ++- .../apache/doris/analysis/ShowSnapshotStmt.java | 33 ++- .../org/apache/doris/backup/BackupHandler.java | 132 +++++++--- .../java/org/apache/doris/backup/BackupJob.java | 25 +- .../org/apache/doris/backup/BackupJobInfo.java | 93 ++++++- .../java/org/apache/doris/backup/BackupMeta.java | 14 ++ .../java/org/apache/doris/backup/Repository.java | 2 + .../java/org/apache/doris/backup/RestoreJob.java | 206 +++++++++++++++- .../java/org/apache/doris/backup/Snapshot.java | 69 ++++++ .../apache/doris/service/FrontendServiceImpl.java | 194 ++++++++++++++- .../java/org/apache/doris/task/DownloadTask.java | 36 ++- gensrc/thrift/AgentService.thrift | 11 + gensrc/thrift/FrontendService.thrift | 48 ++++ gensrc/thrift/Status.thrift | 3 + 18 files changed, 1147 insertions(+), 67 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f596603c35..d830d592e2 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -761,20 +761,28 @@ void TaskWorkerPool::_download_worker_thread_callback() { _tasks.pop_front(); } LOG(INFO) << "get download task. signature=" << agent_task_req.signature - << ", job_id=" << download_request.job_id; + << ", job_id=" << download_request.job_id + << "task detail: " << apache::thrift::ThriftDebugString(download_request); // TODO: download std::vector<int64_t> downloaded_tablet_ids; - std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>( - _env, download_request.job_id, agent_task_req.signature, - download_request.broker_addr, download_request.broker_prop); - Status status = loader->init( - download_request.__isset.storage_backend ? download_request.storage_backend - : TStorageBackendType::type::BROKER, - download_request.__isset.location ? download_request.location : ""); - if (status.ok()) { - status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids); + auto status = Status::OK(); + if (download_request.__isset.remote_tablet_snapshots) { + SnapshotLoader loader(_env, download_request.job_id, agent_task_req.signature); + loader.remote_http_download(download_request.remote_tablet_snapshots, + &downloaded_tablet_ids); + } else { + std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>( + _env, download_request.job_id, agent_task_req.signature, + download_request.broker_addr, download_request.broker_prop); + status = loader->init( + download_request.__isset.storage_backend ? download_request.storage_backend + : TStorageBackendType::type::BROKER, + download_request.__isset.location ? download_request.location : ""); + if (status.ok()) { + status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids); + } } if (!status.ok()) { diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 4db803b10e..f1b58fa454 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -34,9 +34,12 @@ #include <cstring> #include <filesystem> #include <istream> +#include <unordered_map> #include <utility> #include "common/logging.h" +#include "gutil/strings/split.h" +#include "http/http_client.h" #include "io/fs/broker_file_system.h" #include "io/fs/file_system.h" #include "io/fs/hdfs_file_system.h" @@ -370,6 +373,272 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to return status; } +Status SnapshotLoader::remote_http_download( + const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots, + std::vector<int64_t>* downloaded_tablet_ids) { + LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id, + _task_id); + constexpr uint32_t kListRemoteFileTimeout = 15; + constexpr uint32_t kDownloadFileMaxRetry = 3; + constexpr uint32_t kGetLengthTimeout = 10; + + // check if job has already been cancelled + int tmp_counter = 1; + RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); + Status status = Status::OK(); + + // Step before, validate all remote + + // Step 1: Validate local tablet snapshot paths + for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + auto& path = remote_tablet_snapshot.local_snapshot_path; + bool res = true; + RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); + if (!res) { + std::stringstream ss; + auto err_msg = + fmt::format("snapshot path is not directory or does not exist: {}", path); + LOG(WARNING) << err_msg; + return Status::RuntimeError(err_msg); + } + } + + // Step 2: get all local files + struct LocalFileStat { + uint64_t size; + // TODO(Drogon): add md5sum + }; + std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map; + for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + const auto& local_path = remote_tablet_snapshot.local_snapshot_path; + std::vector<std::string> local_files; + RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); + + auto& local_filestat = local_files_map[local_path]; + for (auto& local_file : local_files) { + // add file size + std::string local_file_path = local_path + "/" + local_file; + std::error_code ec; + uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); + if (ec) { + LOG(WARNING) << "download file error" << ec.message(); + return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, + ec.message()); + } + local_filestat[local_file] = {local_file_size}; + } + } + + // Step 3: Validate remote tablet snapshot paths && remote files map + // TODO(Drogon): Add md5sum check + // key is remote snapshot paths, value is filelist + // get all these use http download action + // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr + int report_counter = 0; + int total_num = remote_tablet_snapshots.size(); + int finished_num = 0; + struct RemoteFileStat { + // TODO(Drogon): Add md5sum + std::string url; + uint64_t size; + }; + std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>> + remote_files_map; + for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; + auto& remote_files = remote_files_map[remote_path]; + const auto& token = remote_tablet_snapshot.remote_token; + const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr; + + // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/ + std::string remote_url_prefix = + fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}", + remote_be_addr.hostname, remote_be_addr.port, token, remote_path); + + string file_list_str; + auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) { + RETURN_IF_ERROR(client->init(remote_url_prefix)); + client->set_timeout_ms(kListRemoteFileTimeout * 1000); + return client->execute(&file_list_str); + }; + RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb)); + std::vector<string> filename_list = + strings::Split(file_list_str, "\n", strings::SkipWhitespace()); + + for (const auto& filename : filename_list) { + std::string remote_file_url = fmt::format( + "http://{}:{}/api/_tablet/_download?token={}&file={}/{}", + remote_tablet_snapshot.remote_be_addr.hostname, + remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token, + remote_tablet_snapshot.remote_snapshot_path, filename); + + // get file length + uint64_t file_size = 0; + auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* client) { + RETURN_IF_ERROR(client->init(remote_file_url)); + client->set_timeout_ms(kGetLengthTimeout * 1000); + RETURN_IF_ERROR(client->head()); + RETURN_IF_ERROR(client->get_content_length(&file_size)); + return Status::OK(); + }; + RETURN_IF_ERROR( + HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_size_cb)); + + remote_files[filename] = RemoteFileStat {remote_file_url, file_size}; + } + } + + // Step 4: Compare local and remote files && get all need download files + for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, + TTaskType::type::DOWNLOAD)); + + const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; + const auto& local_path = remote_tablet_snapshot.local_snapshot_path; + auto& remote_files = remote_files_map[remote_path]; + auto& local_files = local_files_map[local_path]; + auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id; + + // get all need download files + std::vector<std::string> need_download_files; + for (const auto& [remote_file, remote_filestat] : remote_files) { + LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file, + remote_filestat.size); + auto it = local_files.find(remote_file); + if (it == local_files.end()) { + need_download_files.emplace_back(remote_file); + continue; + } + if (_end_with(remote_file, ".hdr")) { + need_download_files.emplace_back(remote_file); + continue; + } + + if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) { + need_download_files.emplace_back(remote_file); + continue; + } + // TODO(Drogon): check by md5sum, if not match then download + + LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file); + } + + auto local_tablet_id = remote_tablet_snapshot.local_tablet_id; + TabletSharedPtr tablet = + _env->storage_engine()->tablet_manager()->get_tablet(local_tablet_id); + if (tablet == nullptr) { + std::stringstream ss; + ss << "failed to get local tablet: " << local_tablet_id; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + DataDir* data_dir = tablet->data_dir(); + + // download all need download files + uint64_t total_file_size = 0; + MonotonicStopWatch watch; + watch.start(); + for (auto& filename : need_download_files) { + auto& remote_filestat = remote_files[filename]; + auto file_size = remote_filestat.size; + auto& remote_file_url = remote_filestat.url; + + // check disk capacity + if (data_dir->reach_capacity_limit(file_size)) { + return Status::InternalError("Disk reach capacity limit"); + } + + total_file_size += file_size; + uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; + if (estimate_timeout < config::download_low_speed_time) { + estimate_timeout = config::download_low_speed_time; + } + + std::string local_filename; + RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id, &local_filename)); + std::string local_file_path = local_path + "/" + local_filename; + + LOG(INFO) << "clone begin to download file from: " << remote_file_url + << " to: " << local_file_path << ". size(B): " << file_size + << ", timeout(s): " << estimate_timeout; + + auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path, + file_size](HttpClient* client) { + RETURN_IF_ERROR(client->init(remote_file_url)); + client->set_timeout_ms(estimate_timeout * 1000); + RETURN_IF_ERROR(client->download(local_file_path)); + + std::error_code ec; + // Check file length + uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec); + if (ec) { + LOG(WARNING) << "download file error" << ec.message(); + return Status::IOError("can't retrive file_size of {}, due to {}", + local_file_path, ec.message()); + } + if (local_file_size != file_size) { + LOG(WARNING) << "download file length error" + << ", remote_path=" << remote_file_url + << ", file_size=" << file_size + << ", local_file_size=" << local_file_size; + return Status::InternalError("downloaded file size is not equal"); + } + chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR); + return Status::OK(); + }; + RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb)); + + // local_files always keep the updated local files + local_files[filename] = LocalFileStat {file_size}; + } + + uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000; + total_time_ms = total_time_ms > 0 ? total_time_ms : 0; + double copy_rate = 0.0; + if (total_time_ms > 0) { + copy_rate = total_file_size / ((double)total_time_ms) / 1000; + } + LOG(INFO) << fmt::format( + "succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: " + "{} ms, rate: {} MB/s", + remote_tablet_id, local_tablet_id, total_file_size, total_time_ms, copy_rate); + + // local_files: contain all remote files and local files + // finally, delete local files which are not in remote + for (const auto& [local_file, local_filestat] : local_files) { + // replace the tablet id in local file name with the remote tablet id, + // in order to compare the file name. + std::string new_name; + Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name); + if (!st.ok()) { + LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st + << ". ignore it"; + continue; + } + VLOG_CRITICAL << "new file name after replace tablet id: " << new_name; + const auto& find = remote_files.find(new_name); + if (find != remote_files.end()) { + continue; + } + + // delete + std::string full_local_file = local_path + "/" + local_file; + LOG(INFO) << "begin to delete local snapshot file: " << full_local_file + << ", it does not exist in remote"; + if (remove(full_local_file.c_str()) != 0) { + LOG(WARNING) << "failed to delete unknown local file: " << full_local_file + << ", error: " << strerror(errno) + << ", file size: " << local_filestat.size << ", ignore it"; + } + } + + ++finished_num; + } + + LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id; + return status; +} + // move the snapshot files in snapshot_path // to tablet_path // If overwrite, just replace the tablet_path with snapshot_path, diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h index 9e7a22d7a3..c0d1f0f708 100644 --- a/be/src/runtime/snapshot_loader.h +++ b/be/src/runtime/snapshot_loader.h @@ -33,6 +33,8 @@ namespace io { class RemoteFileSystem; } // namespace io +class TRemoteTabletSnapshot; + struct FileStat { std::string name; std::string md5; @@ -77,6 +79,9 @@ public: Status download(const std::map<std::string, std::string>& src_to_dest_path, std::vector<int64_t>* downloaded_tablet_ids); + Status remote_http_download(const std::vector<TRemoteTabletSnapshot>& remote_tablets, + std::vector<int64_t>* downloaded_tablet_ids); + Status move(const std::string& snapshot_path, TabletSharedPtr tablet, bool overwrite); private: diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index a5e528b45f..1b4a1ec944 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -396,49 +396,49 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, } /// Check args: txn_id, remote_tablet_id, binlog_version, remote_host, remote_port, partition_id, load_id - if (request.__isset.txn_id) { + if (!request.__isset.txn_id) { LOG(WARNING) << "txn_id is empty"; tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR); tstatus.__isset.error_msgs = true; tstatus.error_msgs.emplace_back("txn_id is empty"); return; } - if (request.__isset.remote_tablet_id) { + if (!request.__isset.remote_tablet_id) { LOG(WARNING) << "remote_tablet_id is empty"; tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR); tstatus.__isset.error_msgs = true; tstatus.error_msgs.emplace_back("remote_tablet_id is empty"); return; } - if (request.__isset.binlog_version) { + if (!request.__isset.binlog_version) { LOG(WARNING) << "binlog_version is empty"; tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR); tstatus.__isset.error_msgs = true; tstatus.error_msgs.emplace_back("binlog_version is empty"); return; } - if (request.__isset.remote_host) { + if (!request.__isset.remote_host) { LOG(WARNING) << "remote_host is empty"; tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR); tstatus.__isset.error_msgs = true; tstatus.error_msgs.emplace_back("remote_host is empty"); return; } - if (request.__isset.remote_port) { + if (!request.__isset.remote_port) { LOG(WARNING) << "remote_port is empty"; tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR); tstatus.__isset.error_msgs = true; tstatus.error_msgs.emplace_back("remote_port is empty"); return; } - if (request.__isset.partition_id) { + if (!request.__isset.partition_id) { LOG(WARNING) << "partition_id is empty"; tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR); tstatus.__isset.error_msgs = true; tstatus.error_msgs.emplace_back("partition_id is empty"); return; } - if (request.__isset.load_id) { + if (!request.__isset.load_id) { LOG(WARNING) << "load_id is empty"; tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR); tstatus.__isset.error_msgs = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index 679ebd8cb8..2382093d6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -17,6 +17,7 @@ package org.apache.doris.analysis; +import org.apache.doris.backup.Repository; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; @@ -45,12 +46,22 @@ public class RestoreStmt extends AbstractBackupStmt { private int metaVersion = -1; private boolean reserveReplica = false; private boolean reserveDynamicPartitionEnable = false; + private boolean isLocal = false; + private byte[] meta = null; + private byte[] jobInfo = null; public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause, Map<String, String> properties) { super(labelName, repoName, restoreTableRefClause, properties); } + public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause, + Map<String, String> properties, byte[] meta, byte[] jobInfo) { + super(labelName, repoName, restoreTableRefClause, properties); + this.meta = meta; + this.jobInfo = jobInfo; + } + public boolean allowLoad() { return allowLoad; } @@ -75,8 +86,23 @@ public class RestoreStmt extends AbstractBackupStmt { return reserveDynamicPartitionEnable; } + public boolean isLocal() { + return isLocal; + } + + public byte[] getMeta() { + return meta; + } + + public byte[] getJobInfo() { + return jobInfo; + } + @Override public void analyze(Analyzer analyzer) throws UserException { + if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) { + isLocal = true; + } super.analyze(analyzer); } @@ -148,8 +174,10 @@ public class RestoreStmt extends AbstractBackupStmt { backupTimestamp = copiedProperties.get(PROP_BACKUP_TIMESTAMP); copiedProperties.remove(PROP_BACKUP_TIMESTAMP); } else { - ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, - "Missing " + PROP_BACKUP_TIMESTAMP + " property"); + if (!isLocal) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, + "Missing " + PROP_BACKUP_TIMESTAMP + " property"); + } } // meta version diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java index bdf33ddfec..d10d216b12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java @@ -28,6 +28,11 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; public class ShowSnapshotStmt extends ShowStmt { + public enum SnapshotType { + REMOTE, + LOCAL + } + public static final ImmutableList<String> SNAPSHOT_ALL = new ImmutableList.Builder<String>() .add("Snapshot").add("Timestamp").add("Status") .build(); @@ -39,6 +44,7 @@ public class ShowSnapshotStmt extends ShowStmt { private Expr where; private String snapshotName; private String timestamp; + private SnapshotType snapshotType = SnapshotType.REMOTE; public ShowSnapshotStmt(String repoName, Expr where) { this.repoName = repoName; @@ -87,7 +93,7 @@ public class ShowSnapshotStmt extends ShowStmt { if (!ok) { throw new AnalysisException("Where clause should looks like: SNAPSHOT = 'your_snapshot_name'" - + " [AND TIMESTAMP = '2018-04-18-19-19-10']"); + + " [AND TIMESTAMP = '2018-04-18-19-19-10'] [AND SNAPSHOTTYPE = 'remote' | 'local']"); } } } @@ -116,10 +122,25 @@ public class ShowSnapshotStmt extends ShowStmt { return false; } return true; + } else if (name.equalsIgnoreCase("snapshotType")) { + String snapshotTypeVal = ((StringLiteral) val).getStringValue(); + if (Strings.isNullOrEmpty(snapshotTypeVal)) { + return false; + } + // snapshotType now only support "remote" and "local" + switch (snapshotTypeVal.toLowerCase()) { + case "remote": + snapshotType = SnapshotType.REMOTE; + return true; + case "local": + snapshotType = SnapshotType.LOCAL; + return true; + default: + return false; + } + } else { + return false; } - - return false; - } public String getRepoName() { @@ -134,6 +155,10 @@ public class ShowSnapshotStmt extends ShowStmt { return timestamp; } + public String getSnapshotType() { + return snapshotType.name(); + } + @Override public ShowResultSetMetaData getMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index d149ad574d..6619c457b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -46,6 +46,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.task.DirMoveTask; @@ -56,13 +57,16 @@ import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TTaskType; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.File; import java.io.IOException; @@ -75,7 +79,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -104,6 +110,12 @@ public class BackupHandler extends MasterDaemon implements Writable { private Env env; + // map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes + // this map not present in persist && only in fe master memory + // one table only keep one snapshot info, only keep last + private final Map<String, Snapshot> localSnapshots = new HashMap<>(); + private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock(); + public BackupHandler() { // for persist } @@ -241,9 +253,13 @@ public class BackupHandler extends MasterDaemon implements Writable { public void process(AbstractBackupStmt stmt) throws DdlException { // check if repo exist String repoName = stmt.getRepoName(); - Repository repository = repoMgr.getRepo(repoName); - if (repository == null) { - ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repoName + " does not exist"); + Repository repository = null; + if (!repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) { + repository = repoMgr.getRepo(repoName); + if (repository == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, + "Repository " + repoName + " does not exist"); + } } // check if db exist @@ -286,7 +302,7 @@ public class BackupHandler extends MasterDaemon implements Writable { } private void backup(Repository repository, Database db, BackupStmt stmt) throws DdlException { - if (repository.isReadOnly()) { + if (repository != null && repository.isReadOnly()) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repository.getName() + " is read only"); } @@ -357,25 +373,29 @@ public class BackupHandler extends MasterDaemon implements Writable { } // Check if label already be used - List<String> existSnapshotNames = Lists.newArrayList(); - Status st = repository.listSnapshots(existSnapshotNames); - if (!st.ok()) { - ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg()); - } - if (existSnapshotNames.contains(stmt.getLabel())) { - if (stmt.getType() == BackupType.FULL) { - ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '" - + stmt.getLabel() + "' already exist in repository"); - } else { - ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support " - + "incremental backup"); + long repoId = -1; + if (repository != null) { + List<String> existSnapshotNames = Lists.newArrayList(); + Status st = repository.listSnapshots(existSnapshotNames); + if (!st.ok()) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg()); + } + if (existSnapshotNames.contains(stmt.getLabel())) { + if (stmt.getType() == BackupType.FULL) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '" + + stmt.getLabel() + "' already exist in repository"); + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support " + + "incremental backup"); + } } + repoId = repository.getId(); } // Create a backup job BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(), ClusterNamespace.getNameFromFullName(db.getFullName()), - tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repository.getId()); + tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId); // write log env.getEditLog().logBackupJob(backupJob); @@ -386,26 +406,62 @@ public class BackupHandler extends MasterDaemon implements Writable { } private void restore(Repository repository, Database db, RestoreStmt stmt) throws DdlException { - // Check if snapshot exist in repository - List<BackupJobInfo> infos = Lists.newArrayList(); - Status status = repository.getSnapshotInfoFile(stmt.getLabel(), stmt.getBackupTimestamp(), infos); - if (!status.ok()) { - ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, - "Failed to get info of snapshot '" + stmt.getLabel() + "' because: " - + status.getErrMsg() + ". Maybe specified wrong backup timestamp"); + BackupJobInfo jobInfo; + if (stmt.isLocal()) { + String jobInfoString = new String(stmt.getJobInfo()); + jobInfo = BackupJobInfo.genFromJson(jobInfoString); + + if (jobInfo.extraInfo == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info empty"); + } + if (jobInfo.extraInfo.beNetworkMap == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info be network map"); + } + if (Strings.isNullOrEmpty(jobInfo.extraInfo.token)) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info token"); + } + } else { + // Check if snapshot exist in repository + List<BackupJobInfo> infos = Lists.newArrayList(); + Status status = repository.getSnapshotInfoFile(stmt.getLabel(), stmt.getBackupTimestamp(), infos); + if (!status.ok()) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, + "Failed to get info of snapshot '" + stmt.getLabel() + "' because: " + + status.getErrMsg() + ". Maybe specified wrong backup timestamp"); + } + + // Check if all restore objects are exist in this snapshot. + // Also remove all unrelated objs + Preconditions.checkState(infos.size() == 1); + jobInfo = infos.get(0); } - // Check if all restore objects are exist in this snapshot. - // Also remove all unrelated objs - Preconditions.checkState(infos.size() == 1); - BackupJobInfo jobInfo = infos.get(0); checkAndFilterRestoreObjsExistInSnapshot(jobInfo, stmt.getAbstractBackupTableRefClause()); // Create a restore job - RestoreJob restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), + RestoreJob restoreJob; + if (stmt.isLocal()) { + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(stmt.getMeta()); + DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream); + try { + BackupMeta backupMeta = BackupMeta.read(dataInputStream); + String backupTimestamp = + TimeUtils.longToTimeString(jobInfo.getBackupTime(), TimeUtils.DATETIME_FORMAT_WITH_HYPHEN); + restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp, + db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), + stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), + stmt.reserveDynamicPartitionEnable(), + env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); + } catch (IOException e) { + throw new DdlException(e.getMessage()); + } + } else { + restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), env, repository.getId()); + } + env.getEditLog().logRestoreJob(restoreJob); // must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed. @@ -667,6 +723,24 @@ public class BackupHandler extends MasterDaemon implements Writable { return false; } + public void addSnapshot(String labelName, Snapshot snapshot) { + localSnapshotsLock.writeLock().lock(); + try { + localSnapshots.put(labelName, snapshot); + } finally { + localSnapshotsLock.writeLock().unlock(); + } + } + + public Snapshot getSnapshot(String labelName) { + localSnapshotsLock.readLock().lock(); + try { + return localSnapshots.get(labelName); + } finally { + localSnapshotsLock.readLock().unlock(); + } + } + public static BackupHandler read(DataInput in) throws IOException { BackupHandler backupHandler = new BackupHandler(); backupHandler.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 2300cd1e01..60d486c310 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -114,6 +114,9 @@ public class BackupJob extends AbstractJob { // backup properties private Map<String, String> properties = Maps.newHashMap(); + private byte[] metaInfoBytes = null; + private byte[] jobInfoBytes = null; + public BackupJob() { super(JobType.BACKUP); } @@ -282,7 +285,7 @@ public class BackupJob extends AbstractJob { } // get repo if not set - if (repo == null) { + if (repo == null && repoId != Repository.KEEP_ON_LOCAL_REPO_ID) { repo = env.getBackupHandler().getRepoMgr().getRepo(repoId); if (repo == null) { status = new Status(ErrCode.COMMON_ERROR, "failed to get repository: " + repoId); @@ -565,6 +568,11 @@ public class BackupJob extends AbstractJob { } private void uploadSnapshot() { + if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { + state = BackupJobState.UPLOADING; + return; + } + // reuse this set to save all unfinished tablets unfinishedTaskIds.clear(); taskProgress.clear(); @@ -673,6 +681,8 @@ public class BackupJob extends AbstractJob { } backupMeta.writeToFile(metaInfoFile); localMetaInfoFilePath = metaInfoFile.getAbsolutePath(); + // read meta info to metaInfoBytes + metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); // 3. save job info file jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId, @@ -685,6 +695,8 @@ public class BackupJob extends AbstractJob { } jobInfo.writeToFile(jobInfoFile); localJobInfoFilePath = jobInfoFile.getAbsolutePath(); + // read job info to jobInfoBytes + jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); } catch (Exception e) { status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage()); return; @@ -697,7 +709,9 @@ public class BackupJob extends AbstractJob { jobInfo = null; // release all snapshots before clearing the snapshotInfos. - releaseSnapshots(); + if (repoId != Repository.KEEP_ON_LOCAL_REPO_ID) { + releaseSnapshots(); + } snapshotInfos.clear(); @@ -724,6 +738,13 @@ public class BackupJob extends AbstractJob { } private void uploadMetaAndJobInfoFile() { + if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { + state = BackupJobState.FINISHED; + Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); + env.getBackupHandler().addSnapshot(label, snapshot); + return; + } + String remoteMetaInfoFile = repo.assembleMetaInfoFilePath(label); if (!uploadFile(localMetaInfoFilePath, remoteMetaInfoFile)) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java index ec622dbdca..4457740440 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java @@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -92,10 +93,39 @@ public class BackupJobInfo implements Writable { @SerializedName("meta_version") public int metaVersion; + @SerializedName("tablet_be_map") + public Map<Long, Long> tabletBeMap = Maps.newHashMap(); + + @SerializedName("tablet_snapshot_path_map") + public Map<Long, String> tabletSnapshotPathMap = Maps.newHashMap(); + + public static class ExtraInfo { + public static class NetworkAddrss { + @SerializedName("ip") + public String ip; + @SerializedName("port") + public int port; + } + + @SerializedName("be_network_map") + public Map<Long, NetworkAddrss> beNetworkMap = Maps.newHashMap(); + + @SerializedName("token") + public String token; + } + + @SerializedName("extra_info") + public ExtraInfo extraInfo; + + // This map is used to save the table alias mapping info when processing a restore job. // origin -> alias public Map<String, String> tblAlias = Maps.newHashMap(); + public long getBackupTime() { + return backupTime; + } + public void initBackupJobInfoAfterDeserialize() { // transform success if (successJson.equals("succeed")) { @@ -487,6 +517,62 @@ public class BackupJobInfo implements Writable { return Joiner.on("/").join(pathSeg); } + // struct TRemoteTabletSnapshot { + // 1: optional i64 local_tablet_id + // 2: optional string local_snapshot_path + // 3: optional i64 remote_tablet_id + // 4: optional i64 remote_be_id + // 5: optional Types.TSchemaHash schema_hash + // 6: optional Types.TNetworkAddress remote_be_addr + // 7: optional string remote_snapshot_path + // 8: optional string token + // } + + public String getTabletSnapshotPath(Long tabletId) { + return tabletSnapshotPathMap.get(tabletId); + } + + public Long getBeId(Long tabletId) { + return tabletBeMap.get(tabletId); + } + + public String getToken() { + return extraInfo.token; + } + + public TNetworkAddress getBeAddr(Long beId) { + ExtraInfo.NetworkAddrss addr = extraInfo.beNetworkMap.get(beId); + if (addr == null) { + return null; + } + + return new TNetworkAddress(addr.ip, addr.port); + } + + // TODO(Drogon): improve this find perfermance + public Long getSchemaHash(long tableId, long partitionId, long indexId) { + for (BackupOlapTableInfo backupOlapTableInfo : backupOlapTableObjects.values()) { + if (backupOlapTableInfo.id != tableId) { + continue; + } + + for (BackupPartitionInfo backupPartitionInfo : backupOlapTableInfo.partitions.values()) { + if (backupPartitionInfo.id != partitionId) { + continue; + } + + for (BackupIndexInfo backupIndexInfo : backupPartitionInfo.indexes.values()) { + if (backupIndexInfo.id != indexId) { + continue; + } + + return Long.valueOf(backupIndexInfo.schemaHash); + } + } + } + return null; + } + public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId, BackupContent content, BackupMeta backupMeta, Map<Long, SnapshotInfo> snapshotInfos) { @@ -526,8 +612,11 @@ public class BackupJobInfo implements Writable { } } else { for (Tablet tablet : index.getTablets()) { + SnapshotInfo snapshotInfo = snapshotInfos.get(tablet.getId()); idxInfo.tablets.put(tablet.getId(), - Lists.newArrayList(snapshotInfos.get(tablet.getId()).getFiles())); + Lists.newArrayList(snapshotInfo.getFiles())); + jobInfo.tabletBeMap.put(tablet.getId(), snapshotInfo.getBeId()); + jobInfo.tabletSnapshotPathMap.put(tablet.getId(), snapshotInfo.getPath()); } } idxInfo.tabletsOrder.addAll(index.getTabletIdsInOrder()); @@ -578,7 +667,7 @@ public class BackupJobInfo implements Writable { return genFromJson(json); } - private static BackupJobInfo genFromJson(String json) { + public static BackupJobInfo genFromJson(String json) { /* parse the json string: * { * "backup_time": 1522231864000, diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java index e059d55be6..e22bb7f33c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java @@ -21,8 +21,10 @@ import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Table; import org.apache.doris.common.io.Writable; import org.apache.doris.meta.MetaContext; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataInputStream; @@ -38,10 +40,13 @@ import java.util.Map; public class BackupMeta implements Writable { // tbl name -> tbl + @SerializedName(value = "tblNameMap") private Map<String, Table> tblNameMap = Maps.newHashMap(); // tbl id -> tbl + @SerializedName(value = "tblIdMap") private Map<Long, Table> tblIdMap = Maps.newHashMap(); // resource name -> resource + @SerializedName(value = "resourceNameMap") private Map<String, Resource> resourceNameMap = Maps.newHashMap(); private BackupMeta() { @@ -136,4 +141,13 @@ public class BackupMeta implements Writable { resourceNameMap.put(resource.getName(), resource); } } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public String toString() { + return toJson(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index 6d421332dc..ba95a77352 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -97,6 +97,8 @@ public class Repository implements Writable { public static final String FILE_REPO_INFO = "__repo_info"; public static final String FILE_META_INFO = "__meta"; public static final String DIR_SNAPSHOT_CONTENT = "__ss_content"; + public static final String KEEP_ON_LOCAL_REPO_NAME = "__keep_on_local__"; + public static final long KEEP_ON_LOCAL_REPO_ID = -1; private static final Logger LOG = LogManager.getLogger(Repository.class); private static final String PATH_DELIMITER = "/"; private static final String CHECKSUM_SEPARATOR = "."; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 60ff99c4b9..e9e2eee824 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -71,6 +71,8 @@ import org.apache.doris.task.DownloadTask; import org.apache.doris.task.ReleaseSnapshotTask; import org.apache.doris.task.SnapshotTask; import org.apache.doris.thrift.TFinishTaskRequest; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TRemoteTabletSnapshot; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -185,6 +187,18 @@ public class RestoreJob extends AbstractJob { properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable)); } + public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, + ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, + boolean reserveDynamicPartitionEnable, Env env, long repoId, BackupMeta backupMeta) { + this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica, + reserveDynamicPartitionEnable, env, repoId); + this.backupMeta = backupMeta; + } + + public boolean isFromLocalSnapshot() { + return repoId == Repository.KEEP_ON_LOCAL_REPO_ID; + } + public RestoreJobState getState() { return state; } @@ -324,7 +338,7 @@ public class RestoreJob extends AbstractJob { } // get repo if not set - if (repo == null) { + if (repo == null && !isFromLocalSnapshot()) { repo = env.getBackupHandler().getRepoMgr().getRepo(repoId); if (repo == null) { status = new Status(ErrCode.COMMON_ERROR, "failed to get repository: " + repoId); @@ -1109,6 +1123,15 @@ public class RestoreJob extends AbstractJob { } private boolean downloadAndDeserializeMetaInfo() { + if (isFromLocalSnapshot()) { + if (backupMeta != null) { + return true; + } + + status = new Status(ErrCode.COMMON_ERROR, "backupMeta is null"); + return false; + } + List<BackupMeta> backupMetas = Lists.newArrayList(); Status st = repo.getSnapshotMetaFile(jobInfo.name, backupMetas, this.metaVersion == -1 ? jobInfo.metaVersion : this.metaVersion); @@ -1251,7 +1274,15 @@ public class RestoreJob extends AbstractJob { } private void downloadSnapshots() { - // Categorize snapshot infos by db id. + if (isFromLocalSnapshot()) { + downloadLocalSnapshots(); + } else { + downloadRemoteSnapshots(); + } + } + + private void downloadRemoteSnapshots() { + // Categorize snapshot onfos by db id. ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create(); for (SnapshotInfo info : snapshotInfos.values()) { dbToSnapshotInfos.put(info.getDbId(), info); @@ -1289,7 +1320,8 @@ public class RestoreJob extends AbstractJob { LOG.debug("backend {} has {} batch, total {} tasks, {}", beId, batchNum, totalNum, this); - List<FsBroker> brokerAddrs = Lists.newArrayList(); + List<FsBroker> brokerAddrs = null; + brokerAddrs = Lists.newArrayList(); Status st = repo.getBrokerAddress(beId, env, brokerAddrs); if (!st.ok()) { status = st; @@ -1401,6 +1433,174 @@ public class RestoreJob extends AbstractJob { LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this); } + private void downloadLocalSnapshots() { + // Categorize snapshot infos by db id. + ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create(); + for (SnapshotInfo info : snapshotInfos.values()) { + dbToSnapshotInfos.put(info.getDbId(), info); + } + + // Send download tasks + unfinishedSignatureToId.clear(); + taskProgress.clear(); + taskErrMsg.clear(); + AgentBatchTask batchTask = new AgentBatchTask(); + for (long dbId : dbToSnapshotInfos.keySet()) { + List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId); + + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + status = new Status(ErrCode.NOT_FOUND, "db " + dbId + " does not exist"); + return; + } + + // We classify the snapshot info by backend + ArrayListMultimap<Long, SnapshotInfo> beToSnapshots = ArrayListMultimap.create(); + for (SnapshotInfo info : infos) { + beToSnapshots.put(info.getBeId(), info); + } + + db.readLock(); + try { + for (Long beId : beToSnapshots.keySet()) { + List<SnapshotInfo> beSnapshotInfos = beToSnapshots.get(beId); + int totalNum = beSnapshotInfos.size(); + // each backend allot at most 3 tasks + int batchNum = Math.min(totalNum, 3); + // each task contains several upload sub tasks + int taskNumPerBatch = Math.max(totalNum / batchNum, 1); + + // allot tasks + int index = 0; + for (int batch = 0; batch < batchNum; batch++) { + List<TRemoteTabletSnapshot> remoteTabletSnapshots = Lists.newArrayList(); + int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch; + for (int j = 0; j < currentBatchTaskNum; j++) { + TRemoteTabletSnapshot remoteTabletSnapshot = new TRemoteTabletSnapshot(); + + SnapshotInfo info = beSnapshotInfos.get(index++); + Table tbl = db.getTableNullable(info.getTblId()); + if (tbl == null) { + status = new Status(ErrCode.NOT_FOUND, "restored table " + + info.getTabletId() + " does not exist"); + return; + } + OlapTable olapTbl = (OlapTable) tbl; + olapTbl.readLock(); + try { + Partition part = olapTbl.getPartition(info.getPartitionId()); + if (part == null) { + status = new Status(ErrCode.NOT_FOUND, "partition " + + info.getPartitionId() + " does not exist in restored table: " + + tbl.getName()); + return; + } + + MaterializedIndex idx = part.getIndex(info.getIndexId()); + if (idx == null) { + status = new Status(ErrCode.NOT_FOUND, "index " + info.getIndexId() + + " does not exist in partion " + part.getName() + + "of restored table " + tbl.getName()); + return; + } + + Tablet tablet = idx.getTablet(info.getTabletId()); + if (tablet == null) { + status = new Status(ErrCode.NOT_FOUND, + "tablet " + info.getTabletId() + " does not exist in restored table " + + tbl.getName()); + return; + } + + Replica replica = tablet.getReplicaByBackendId(info.getBeId()); + if (replica == null) { + status = new Status(ErrCode.NOT_FOUND, + "replica in be " + info.getBeId() + " of tablet " + + tablet.getId() + " does not exist in restored table " + + tbl.getName()); + return; + } + + IdChain catalogIds = new IdChain(tbl.getId(), part.getId(), idx.getId(), + info.getTabletId(), replica.getId()); + IdChain repoIds = fileMapping.get(catalogIds); + if (repoIds == null) { + status = new Status(ErrCode.NOT_FOUND, + "failed to get id mapping of catalog ids: " + catalogIds.toString()); + return; + } + + SnapshotInfo snapshotInfo = snapshotInfos.get(info.getTabletId(), info.getBeId()); + Preconditions.checkNotNull(snapshotInfo, info.getTabletId() + "-" + info.getBeId()); + // download to previous exist snapshot dir + String dest = snapshotInfo.getTabletPath(); + + Long localTabletId = info.getTabletId(); + String localSnapshotPath = dest; + Long remoteTabletId = repoIds.getTabletId(); + Long remoteBeId = jobInfo.getBeId(remoteTabletId); + String remoteSnapshotPath = jobInfo.getTabletSnapshotPath(remoteTabletId); + if (remoteSnapshotPath == null) { + status = new Status(ErrCode.NOT_FOUND, + "failed to get remote snapshot path of tablet: " + remoteTabletId); + return; + } + Long schemaHash = jobInfo.getSchemaHash( + repoIds.getTblId(), repoIds.getPartId(), repoIds.getIdxId()); + if (schemaHash == null) { + status = new Status(ErrCode.NOT_FOUND, + "failed to get schema hash of table: " + repoIds.getTblId() + + ", partition: " + repoIds.getPartId() + + ", index: " + repoIds.getIdxId()); + return; + } + // remoteSnapshotPath = "${remoteSnapshotPath}/${remoteTabletId}/${schemaHash}" + remoteSnapshotPath = + String.format("%s/%d/%d", remoteSnapshotPath, remoteTabletId, schemaHash); + TNetworkAddress remoteBeAddr = jobInfo.getBeAddr(remoteBeId); + if (remoteBeAddr == null) { + status = new Status(ErrCode.NOT_FOUND, + "failed to get remote be address of be: " + remoteBeId); + return; + } + String remoteToken = jobInfo.getToken(); + + remoteTabletSnapshot.setLocalTabletId(localTabletId); + remoteTabletSnapshot.setLocalSnapshotPath(localSnapshotPath); + remoteTabletSnapshot.setRemoteTabletId(remoteTabletId); + remoteTabletSnapshot.setRemoteBeId(remoteBeId); + remoteTabletSnapshot.setRemoteBeAddr(remoteBeAddr); + remoteTabletSnapshot.setRemoteSnapshotPath(remoteSnapshotPath); + remoteTabletSnapshot.setRemoteToken(remoteToken); + + remoteTabletSnapshots.add(remoteTabletSnapshot); + } finally { + olapTbl.readUnlock(); + } + } + long signature = env.getNextId(); + DownloadTask task = new DownloadTask(null, beId, signature, jobId, dbId, remoteTabletSnapshots); + batchTask.addTask(task); + unfinishedSignatureToId.put(signature, beId); + } + } + } finally { + db.readUnlock(); + } + } + + // send task + for (AgentTask task : batchTask.getAllTasks()) { + AgentTaskQueue.addTask(task); + } + AgentTaskExecutor.submit(batchTask); + + state = RestoreJobState.DOWNLOADING; + + // No edit log here + LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this); + } + private void waitingAllDownloadFinished() { if (unfinishedSignatureToId.isEmpty()) { downloadFinishedTime = System.currentTimeMillis(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java new file mode 100644 index 0000000000..b26cb2e1e7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.backup; + +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +public class Snapshot { + @SerializedName(value = "label") + private String label = null; + + @SerializedName(value = "meta") + private byte[] meta = null; + + @SerializedName(value = "jobInfo") + private byte[] jobInfo = null; + + @SerializedName(value = "createTime") + private String createTime = null; + + public Snapshot() { + } + + public Snapshot(String label, byte[] meta, byte[] jobInfo) { + this.label = label; + this.meta = meta; + this.jobInfo = jobInfo; + } + + + public byte[] getMeta() { + return meta; + } + + public byte[] getJobInfo() { + return jobInfo; + } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public String toString() { + // return toJson(); + return "Snapshot{" + + "label='" + label + '\'' + + ", meta=" + meta + + ", jobInfo=" + jobInfo + + ", createTime='" + createTime + '\'' + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b650100823..2dbb007498 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -19,11 +19,15 @@ package org.apache.doris.service; import org.apache.doris.alter.SchemaChangeHandler; import org.apache.doris.analysis.AddColumnsClause; +import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.ColumnDef; +import org.apache.doris.analysis.LabelName; +import org.apache.doris.analysis.RestoreStmt; import org.apache.doris.analysis.SetType; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TypeDef; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.backup.Snapshot; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; @@ -63,6 +67,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; +import org.apache.doris.qe.DdlExecutor; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.VariableMgr; @@ -106,6 +111,8 @@ import org.apache.doris.thrift.TGetBinlogResult; import org.apache.doris.thrift.TGetDbsParams; import org.apache.doris.thrift.TGetDbsResult; import org.apache.doris.thrift.TGetQueryStatsRequest; +import org.apache.doris.thrift.TGetSnapshotRequest; +import org.apache.doris.thrift.TGetSnapshotResult; import org.apache.doris.thrift.TGetTablesParams; import org.apache.doris.thrift.TGetTablesResult; import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; @@ -137,11 +144,14 @@ import org.apache.doris.thrift.TReplicaInfo; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TReportRequest; +import org.apache.doris.thrift.TRestoreSnapshotRequest; +import org.apache.doris.thrift.TRestoreSnapshotResult; import org.apache.doris.thrift.TRollbackTxnRequest; import org.apache.doris.thrift.TRollbackTxnResult; import org.apache.doris.thrift.TShowVariableRequest; import org.apache.doris.thrift.TShowVariableResult; import org.apache.doris.thrift.TSnapshotLoaderReportRequest; +import org.apache.doris.thrift.TSnapshotType; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; @@ -2124,15 +2134,15 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new UserException("prev_commit_seq is not set"); } + + // step 1: check auth String cluster = request.getCluster(); if (Strings.isNullOrEmpty(cluster)) { cluster = SystemInfoService.DEFAULT_CLUSTER; } - - // step 1: check auth if (Strings.isNullOrEmpty(request.getToken())) { checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTable(), - request.getUserIp(), PrivPredicate.LOAD); + request.getUserIp(), PrivPredicate.SELECT); } // step 3: check database @@ -2181,4 +2191,182 @@ public class FrontendServiceImpl implements FrontendService.Iface { } return result; } + + // getSnapshot + public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.trace("receive get snapshot info request: {}", request); + + TGetSnapshotResult result = new TGetSnapshotResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + result = getSnapshotImpl(request, clientAddr); + } catch (UserException e) { + LOG.warn("failed to get snapshot info: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + + return result; + } + + // getSnapshotImpl + private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp) + throws UserException { + // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type + if (!request.isSetUser()) { + throw new UserException("user is not set"); + } + if (!request.isSetPasswd()) { + throw new UserException("passwd is not set"); + } + if (!request.isSetDb()) { + throw new UserException("db is not set"); + } + if (!request.isSetLabelName()) { + throw new UserException("label_name is not set"); + } + if (!request.isSetSnapshotName()) { + throw new UserException("snapshot_name is not set"); + } + if (!request.isSetSnapshotType()) { + throw new UserException("snapshot_type is not set"); + } else if (request.getSnapshotType() != TSnapshotType.LOCAL) { + throw new UserException("snapshot_type is not LOCAL"); + } + + // Step 2: check auth + String cluster = request.getCluster(); + if (Strings.isNullOrEmpty(cluster)) { + cluster = SystemInfoService.DEFAULT_CLUSTER; + } + + LOG.info("get snapshot info, user: {}, db: {}, label_name: {}, snapshot_name: {}, snapshot_type: {}", + request.getUser(), request.getDb(), request.getLabelName(), request.getSnapshotName(), + request.getSnapshotType()); + if (Strings.isNullOrEmpty(request.getToken())) { + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), + request.getTable(), clientIp, PrivPredicate.LOAD); + } + + // Step 3: get snapshot + TGetSnapshotResult result = new TGetSnapshotResult(); + result.setStatus(new TStatus(TStatusCode.OK)); + Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName()); + if (snapshot == null) { + result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); + result.getStatus().addToErrorMsgs("snapshot not exist"); + } else { + result.setMeta(snapshot.getMeta()); + result.setJobInfo(snapshot.getJobInfo()); + } + + return result; + } + + // Restore Snapshot + public TRestoreSnapshotResult restoreSnapshot(TRestoreSnapshotRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.trace("receive restore snapshot info request: {}", request); + + TRestoreSnapshotResult result = new TRestoreSnapshotResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + result = restoreSnapshotImpl(request, clientAddr); + } catch (UserException e) { + LOG.warn("failed to get snapshot info: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + + return result; + } + + // restoreSnapshotImpl + private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest request, String clientIp) + throws UserException { + // Step 1: Check all required arg: user, passwd, db, label_name, repo_name, meta, info + if (!request.isSetUser()) { + throw new UserException("user is not set"); + } + if (!request.isSetPasswd()) { + throw new UserException("passwd is not set"); + } + if (!request.isSetDb()) { + throw new UserException("db is not set"); + } + if (!request.isSetLabelName()) { + throw new UserException("label_name is not set"); + } + if (!request.isSetRepoName()) { + throw new UserException("repo_name is not set"); + } + if (!request.isSetMeta()) { + throw new UserException("meta is not set"); + } + if (!request.isSetJobInfo()) { + throw new UserException("job_info is not set"); + } + + // Step 2: check auth + String cluster = request.getCluster(); + if (Strings.isNullOrEmpty(cluster)) { + cluster = SystemInfoService.DEFAULT_CLUSTER; + } + + if (Strings.isNullOrEmpty(request.getToken())) { + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), + request.getTable(), clientIp, PrivPredicate.LOAD); + } + + // Step 3: get snapshot + TRestoreSnapshotResult result = new TRestoreSnapshotResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + + + LabelName label = new LabelName(request.getDb(), request.getLabelName()); + String repoName = request.getRepoName(); + Map<String, String> properties = request.getProperties(); + RestoreStmt restoreStmt = new RestoreStmt(label, repoName, null, properties, request.getMeta(), + request.getJobInfo()); + LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt); + try { + ConnectContext ctx = ConnectContext.get(); + if (ctx == null) { + ctx = new ConnectContext(); + ctx.setThreadLocalInfo(); + } + ctx.setCluster(cluster); + ctx.setQualifiedUser(request.getUser()); + UserIdentity currentUserIdentity = new UserIdentity(request.getUser(), "%"); + ctx.setCurrentUserIdentity(currentUserIdentity); + + Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx); + restoreStmt.analyze(analyzer); + DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt); + } catch (UserException e) { + LOG.warn("failed to get snapshot info: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + } + + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java index 64b75a70d3..6482c5f807 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java @@ -21,9 +21,11 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.FsBroker; import org.apache.doris.thrift.TDownloadReq; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TRemoteTabletSnapshot; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; +import java.util.List; import java.util.Map; public class DownloadTask extends AgentTask { @@ -34,6 +36,9 @@ public class DownloadTask extends AgentTask { private Map<String, String> brokerProperties; private StorageBackend.StorageType storageType; private String location; + private List<TRemoteTabletSnapshot> remoteTabletSnapshots; + private boolean isFromLocalSnapshot = false; + public DownloadTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId, Map<String, String> srcToDestPath, FsBroker brokerAddr, Map<String, String> brokerProperties, @@ -45,6 +50,16 @@ public class DownloadTask extends AgentTask { this.brokerProperties = brokerProperties; this.storageType = storageType; this.location = location; + this.isFromLocalSnapshot = false; + } + + public DownloadTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId, + List<TRemoteTabletSnapshot> remoteTabletSnapshots) { + super(resourceInfo, backendId, TTaskType.DOWNLOAD, dbId, -1, -1, -1, -1, signature); + this.jobId = jobId; + this.srcToDestPath = new java.util.HashMap<String, String>(); + this.remoteTabletSnapshots = remoteTabletSnapshots; + this.isFromLocalSnapshot = true; } public long getJobId() { @@ -64,11 +79,22 @@ public class DownloadTask extends AgentTask { } public TDownloadReq toThrift() { - TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port); - TDownloadReq req = new TDownloadReq(jobId, srcToDestPath, address); - req.setBrokerProp(brokerProperties); - req.setStorageBackend(storageType.toThrift()); - req.setLocation(location); + // these fields are required + // 1: required i64 job_id + // 2: required map<string, string> src_dest_map + // 3: required Types.TNetworkAddress broker_addr + TDownloadReq req; + if (isFromLocalSnapshot) { + TNetworkAddress brokerAddr = new TNetworkAddress("", 0); // mock broker address + req = new TDownloadReq(jobId, srcToDestPath, brokerAddr); + req.setRemoteTabletSnapshots(remoteTabletSnapshots); + } else { + TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port); + req = new TDownloadReq(jobId, srcToDestPath, address); + req.setBrokerProp(brokerProperties); + req.setStorageBackend(storageType.toThrift()); + req.setLocation(location); + } return req; } } diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 6b74944cd7..b30c7ed26a 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -288,6 +288,16 @@ struct TUploadReq { 6: optional string location // root path } +struct TRemoteTabletSnapshot { + 1: optional i64 local_tablet_id + 2: optional string local_snapshot_path + 3: optional i64 remote_tablet_id + 4: optional i64 remote_be_id + 5: optional Types.TNetworkAddress remote_be_addr + 6: optional string remote_snapshot_path + 7: optional string remote_token +} + struct TDownloadReq { 1: required i64 job_id 2: required map<string, string> src_dest_map @@ -295,6 +305,7 @@ struct TDownloadReq { 4: optional map<string, string> broker_prop 5: optional Types.TStorageBackendType storage_backend = Types.TStorageBackendType.BROKER 6: optional string location // root path + 7: optional list<TRemoteTabletSnapshot> remote_tablet_snapshots } struct TSnapshotRequest { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5aecdc4ba2..433b137a37 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -974,6 +974,52 @@ struct TGetTabletReplicaInfosResult { 3: optional string token } +enum TSnapshotType { + REMOTE = 0, + LOCAL = 1, +} + +struct TGetSnapshotRequest { + 1: optional string cluster + 2: optional string user + 3: optional string passwd + 4: optional string db + 5: optional string table + 6: optional string token + 7: optional string label_name + 8: optional string snapshot_name + 9: optional TSnapshotType snapshot_type +} + +struct TGetSnapshotResult { + 1: optional Status.TStatus status + 2: optional binary meta + 3: optional binary job_info +} + +struct TTableRef { + 1: optional string table +} + +struct TRestoreSnapshotRequest { + 1: optional string cluster + 2: optional string user + 3: optional string passwd + 4: optional string db + 5: optional string table + 6: optional string token + 7: optional string label_name + 8: optional string repo_name + 9: optional list<TTableRef> table_refs + 10: optional map<string, string> properties + 11: optional binary meta + 12: optional binary job_info +} + +struct TRestoreSnapshotResult { + 1: optional Status.TStatus status +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1006,6 +1052,8 @@ service FrontendService { TCommitTxnResult commitTxn(1: TCommitTxnRequest request) TRollbackTxnResult rollbackTxn(1: TRollbackTxnRequest request) TGetBinlogResult getBinlog(1: TGetBinlogRequest request) + TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request) + TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request) TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request) diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift index 7edaecf7fb..0342f9bce0 100644 --- a/gensrc/thrift/Status.thrift +++ b/gensrc/thrift/Status.thrift @@ -91,6 +91,9 @@ enum TStatusCode { BINLOG_TOO_NEW_COMMIT_SEQ = 62, BINLOG_NOT_FOUND_DB = 63, BINLOG_NOT_FOUND_TABLE = 64, + + // Snapshot Related from 70 + SNAPSHOT_NOT_EXIST = 70, } struct TStatus { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
