This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 37253328bea [feat](http) Add sync and export cloud meta API (#60739)
37253328bea is described below
commit 37253328bea712ae387dff140606eccd4c551ab0
Author: walter <[email protected]>
AuthorDate: Sat Feb 28 11:03:31 2026 +0800
[feat](http) Add sync and export cloud meta API (#60739)
---
cloud/src/common/bvars.cpp | 5 +
cloud/src/common/bvars.h | 3 +
cloud/src/meta-service/meta_service.h | 11 +
cloud/src/meta-service/meta_service_txn.cpp | 56 ++++
cloud/test/meta_service_test.cpp | 17 ++
.../doris/cloud/persist/CloudMetaSyncPoint.java | 69 +++++
.../apache/doris/cloud/rpc/MetaServiceClient.java | 5 +
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 5 +
.../httpv2/rest/manager/MetaBackupAction.java | 329 +++++++++++++++++++++
.../org/apache/doris/journal/JournalEntity.java | 6 +
.../java/org/apache/doris/persist/EditLog.java | 10 +
.../org/apache/doris/persist/OperationType.java | 1 +
gensrc/proto/cloud.proto | 12 +
13 files changed, 529 insertions(+)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 91b1e0bb649..f2dd86d8d1c 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -39,6 +39,7 @@ BvarLatencyRecorderWithTag
g_bvar_ms_commit_txn_eventually("ms", "commit_txn_eve
BvarLatencyRecorderWithTag g_bvar_ms_abort_txn("ms", "abort_txn");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn("ms", "get_txn");
BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms",
"get_current_max_txn_id");
+BvarLatencyRecorderWithTag g_bvar_ms_create_meta_sync_point("ms",
"create_meta_sync_point");
BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms",
"check_txn_conflict");
@@ -467,6 +468,8 @@ mBvarInt64Adder
g_bvar_rpc_kv_abort_txn_del_counter("rpc_kv_abort_txn_del_counte
mBvarInt64Adder
g_bvar_rpc_kv_get_txn_get_counter("rpc_kv_get_txn_get_counter",{"instance_id"});
// get_current_max_txn_id
mBvarInt64Adder
g_bvar_rpc_kv_get_current_max_txn_id_get_counter("rpc_kv_get_current_max_txn_id_get_counter",{"instance_id"});
+// create_meta_sync_point
+mBvarInt64Adder
g_bvar_rpc_kv_create_meta_sync_point_del_counter("rpc_kv_create_meta_sync_point_del_counter",{"instance_id"});
// begin_sub_txn
mBvarInt64Adder
g_bvar_rpc_kv_begin_sub_txn_get_counter("rpc_kv_begin_sub_txn_get_counter",{"instance_id"});
mBvarInt64Adder
g_bvar_rpc_kv_begin_sub_txn_put_counter("rpc_kv_begin_sub_txn_put_counter",{"instance_id"});
@@ -669,6 +672,8 @@ mBvarInt64Adder
g_bvar_rpc_kv_abort_txn_del_bytes("rpc_kv_abort_txn_del_bytes",{
mBvarInt64Adder
g_bvar_rpc_kv_get_txn_get_bytes("rpc_kv_get_txn_get_bytes",{"instance_id"});
// get_current_max_txn_id
mBvarInt64Adder
g_bvar_rpc_kv_get_current_max_txn_id_get_bytes("rpc_kv_get_current_max_txn_id_get_bytes",{"instance_id"});
+// create_meta_sync_point
+mBvarInt64Adder
g_bvar_rpc_kv_create_meta_sync_point_del_bytes("rpc_kv_create_meta_sync_point_del_bytes",{"instance_id"});
// begin_sub_txn
mBvarInt64Adder
g_bvar_rpc_kv_begin_sub_txn_get_bytes("rpc_kv_begin_sub_txn_get_bytes",{"instance_id"});
mBvarInt64Adder
g_bvar_rpc_kv_begin_sub_txn_put_bytes("rpc_kv_begin_sub_txn_put_bytes",{"instance_id"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index e4b9789c1bf..695a9c0206b 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -551,6 +551,7 @@ extern BvarLatencyRecorderWithTag
g_bvar_ms_commit_txn_eventually;
extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id;
+extern BvarLatencyRecorderWithTag g_bvar_ms_create_meta_sync_point;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict;
extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_prepare_txn_by_coordinator;
@@ -887,6 +888,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_del_counter;
@@ -1026,6 +1028,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_del_bytes;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index f26dc3ad360..2e4e2790f4b 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -125,6 +125,10 @@ public:
const GetCurrentMaxTxnRequest* request,
GetCurrentMaxTxnResponse* response,
::google::protobuf::Closure* done) override;
+ void create_meta_sync_point(::google::protobuf::RpcController* controller,
+ const CreateMetaSyncPointRequest* request,
+ CreateMetaSyncPointResponse* response,
+ ::google::protobuf::Closure* done) override;
void begin_sub_txn(::google::protobuf::RpcController* controller,
const BeginSubTxnRequest* request, BeginSubTxnResponse*
response,
@@ -570,6 +574,13 @@ public:
call_impl(&cloud::MetaService::get_current_max_txn_id, controller,
request, response, done);
}
+ void create_meta_sync_point(::google::protobuf::RpcController* controller,
+ const CreateMetaSyncPointRequest* request,
+ CreateMetaSyncPointResponse* response,
+ ::google::protobuf::Closure* done) override {
+ call_impl(&cloud::MetaService::create_meta_sync_point, controller,
request, response, done);
+ }
+
void begin_sub_txn(::google::protobuf::RpcController* controller,
const BeginSubTxnRequest* request, BeginSubTxnResponse*
response,
::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index e707883105b..cd7b62e7e6d 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -46,6 +46,8 @@ using namespace std::chrono;
namespace doris::cloud {
+static constexpr std::string_view kMetaSyncPointDummyKey =
"__meta_service_sync_point_dummy_key__";
+
struct TableStats {
int64_t updated_row_count = 0;
@@ -3628,6 +3630,60 @@ void
MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController*
response->set_current_max_txn_id(current_max_txn_id);
}
+void
MetaServiceImpl::create_meta_sync_point(::google::protobuf::RpcController*
controller,
+ const CreateMetaSyncPointRequest*
request,
+ CreateMetaSyncPointResponse*
response,
+ ::google::protobuf::Closure*
done) {
+ RPC_PREPROCESS(create_meta_sync_point, del);
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+ RPC_RATE_LIMIT(create_meta_sync_point)
+
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ msg = "failed to create txn";
+ code = cast_as<ErrCategory::CREATE>(err);
+ return;
+ }
+
+ txn->enable_get_versionstamp();
+ txn->remove(kMetaSyncPointDummyKey);
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "txn->commit() failed, err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ int64_t committed_version = 0;
+ err = txn->get_committed_version(&committed_version);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "get committed version failed, err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ Versionstamp versionstamp;
+ err = txn->get_versionstamp(&versionstamp);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "get versionstamp failed, err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ response->set_committed_version(committed_version);
+ response->set_versionstamp(versionstamp.to_string());
+}
+
/**
* 1. Generate a sub_txn_id
*
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index f2f623cf1c1..f815c0d3244 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -2671,6 +2671,23 @@ TEST(MetaServiceTest, GetCurrentMaxTxnIdTest) {
ASSERT_GE(max_txn_id_res.current_max_txn_id(), begin_txn_res.txn_id());
}
+TEST(MetaServiceTest, CreateMetaSyncPointTest) {
+ auto meta_service = get_meta_service();
+ const std::string cloud_unique_id = "test_cloud_unique_id";
+
+ brpc::Controller cntl;
+ CreateMetaSyncPointRequest req;
+ CreateMetaSyncPointResponse resp;
+ req.set_cloud_unique_id(cloud_unique_id);
+
+ meta_service->create_meta_sync_point(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
&resp, nullptr);
+
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+ ASSERT_GT(resp.committed_version(), 0);
+ ASSERT_EQ(resp.versionstamp().size(), 20);
+}
+
TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) {
auto meta_service = get_meta_service();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java
new file mode 100644
index 00000000000..e226bad144d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class CloudMetaSyncPoint implements Writable {
+ @SerializedName(value = "committedVersion")
+ private long committedVersion;
+
+ @SerializedName(value = "versionStamp")
+ private String versionStamp;
+
+ @SerializedName(value = "createTimeMs")
+ private long createTimeMs;
+
+ public CloudMetaSyncPoint() {
+ }
+
+ public CloudMetaSyncPoint(long committedVersion, String versionStamp, long
createTimeMs) {
+ this.committedVersion = committedVersion;
+ this.versionStamp = versionStamp;
+ this.createTimeMs = createTimeMs;
+ }
+
+ public long getCommittedVersion() {
+ return committedVersion;
+ }
+
+ public String getVersionStamp() {
+ return versionStamp;
+ }
+
+ public long getCreateTimeMs() {
+ return createTimeMs;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static CloudMetaSyncPoint read(DataInput in) throws IOException {
+ return GsonUtils.GSON.fromJson(Text.readString(in),
CloudMetaSyncPoint.class);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index a27a823c7c1..2f85ef17a8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -221,6 +221,11 @@ public class MetaServiceClient {
.getCurrentMaxTxnId(request);
}
+ public Cloud.CreateMetaSyncPointResponse
createMetaSyncPoint(Cloud.CreateMetaSyncPointRequest request) {
+ return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
+ .createMetaSyncPoint(request);
+ }
+
public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest
request) {
if (!request.hasCloudUniqueId()) {
Cloud.BeginSubTxnRequest.Builder builder =
Cloud.BeginSubTxnRequest.newBuilder();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index e1cb45401db..90e3a9276cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -387,6 +387,11 @@ public class MetaServiceProxy {
return executeWithMetrics("getCurrentMaxTxnId", (client) ->
client.getCurrentMaxTxnId(request));
}
+ public Cloud.CreateMetaSyncPointResponse
createMetaSyncPoint(Cloud.CreateMetaSyncPointRequest request)
+ throws RpcException {
+ return executeWithMetrics("createMetaSyncPoint", (client) ->
client.createMetaSyncPoint(request));
+ }
+
public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest
request)
throws RpcException {
return executeWithMetrics("beginSubTxn", (client) ->
client.beginSubTxn(request));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java
new file mode 100644
index 00000000000..71a053e25fc
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest.manager;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.persist.CloudMetaSyncPoint;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.httpv2.rest.RestBaseController;
+import org.apache.doris.journal.Journal;
+import org.apache.doris.journal.bdbje.BDBJEJournal;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.Storage;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.FrontendOptions;
+
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.util.DbBackup;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import org.apache.commons.io.FileUtils;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/rest/v2/manager/backup")
+public class MetaBackupAction extends RestBaseController {
+ private static final String ALLOW_REDIRECT = "allow_redirect";
+
+ @PostMapping("/sync_cloud_meta")
+ public Object syncCloudMeta(HttpServletRequest request,
HttpServletResponse response) {
+ if (!Config.isCloudMode()) {
+ return ResponseEntityBuilder.okWithCommonError("/sync_cloud_meta
only works on the cloud mode");
+ }
+ try {
+ if (needRedirect(request.getScheme())) {
+ return redirectToHttps(request);
+ }
+ executeCheckPassword(request, response);
+
checkGlobalAuth(org.apache.doris.qe.ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
+ Object redirectOrError = checkMasterAndRedirectIfNeeded(request,
response);
+ if (redirectOrError != null) {
+ return redirectOrError;
+ }
+
+ synchronized (Env.getCurrentEnv().getEditLog()) {
+ MetaSyncPointVersion syncVersion = createMetaSyncPoint();
+ CloudMetaSyncPoint syncPoint = new
CloudMetaSyncPoint(syncVersion.committedVersion,
+ syncVersion.versionStamp,
+ System.currentTimeMillis());
+ long journalId =
Env.getCurrentEnv().getEditLog().logMetaSyncPoint(syncPoint);
+
+ Map<String, Object> data = new HashMap<>();
+ data.put("journal_id", journalId);
+ data.put("committed_version", syncVersion.committedVersion);
+ data.put("versionstamp", syncVersion.versionStamp);
+ return ResponseEntityBuilder.ok(data);
+ }
+ } catch (Exception e) {
+ return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+ }
+ }
+
+ @PostMapping("/export_meta")
+ public Object exportMeta(@RequestBody ExportMetaRequest req,
+ HttpServletRequest request, HttpServletResponse response) {
+ if (!Config.isCloudMode()) {
+ return ResponseEntityBuilder.okWithCommonError("/export_meta only
works on the cloud mode");
+ }
+ try {
+ if (needRedirect(request.getScheme())) {
+ return redirectToHttps(request);
+ }
+ executeCheckPassword(request, response);
+
checkGlobalAuth(org.apache.doris.qe.ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
+ Object redirectOrError = checkMasterAndRedirectIfNeeded(request,
response);
+ if (redirectOrError != null) {
+ return redirectOrError;
+ }
+
+ if (req == null || Strings.isNullOrEmpty(req.getTargetDir())) {
+ return ResponseEntityBuilder.badRequest("target_dir is
required");
+ }
+ File targetDir = prepareTargetDir(req.getTargetDir());
+ if (Env.getCurrentEnv().getCheckpointer() != null) {
+
Env.getCurrentEnv().getCheckpointer().getLock().readLock().lock();
+ }
+ try {
+ CopiedImage copiedImage = copyLatestImageIfExists(targetDir);
+ copyImageMetaFiles(targetDir);
+ BdbExportResult bdbResult = exportBdbJe(targetDir,
copiedImage.version, copiedImage.exists);
+
+ Map<String, Object> data = new HashMap<>();
+ data.put("target_dir", targetDir.getAbsolutePath());
+ data.put("bdb_dir", new File(targetDir,
"bdb").getAbsolutePath());
+ data.put("bdb_file_count", bdbResult.fileCount);
+ data.put("image_file", copiedImage.exists ?
copiedImage.file.getName() : null);
+ data.put("image_version", copiedImage.version);
+ data.put("image_exported", copiedImage.exists);
+ data.put("journal_upper_bound", bdbResult.journalUpperBound);
+ return ResponseEntityBuilder.ok(data);
+ } finally {
+ if (Env.getCurrentEnv().getCheckpointer() != null) {
+
Env.getCurrentEnv().getCheckpointer().getLock().readLock().unlock();
+ }
+ }
+ } catch (Exception e) {
+ return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+ }
+ }
+
+ private Object checkMasterAndRedirectIfNeeded(HttpServletRequest request,
HttpServletResponse response)
+ throws Exception {
+ if (Env.getCurrentEnv().isMaster()) {
+ return null;
+ }
+ if (Boolean.parseBoolean(request.getParameter(ALLOW_REDIRECT))) {
+ return redirectToMasterOrException(request, response);
+ }
+ return ResponseEntityBuilder.okWithCommonError(
+ "current fe is not master, master is "
+ + Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort());
+ }
+
+ private MetaSyncPointVersion createMetaSyncPoint() throws DdlException {
+ Cloud.CreateMetaSyncPointRequest req =
Cloud.CreateMetaSyncPointRequest.newBuilder()
+ .setCloudUniqueId(Config.cloud_unique_id)
+ .setRequestIp(FrontendOptions.getLocalHostAddressCached())
+ .build();
+ try {
+ Cloud.CreateMetaSyncPointResponse resp =
MetaServiceProxy.getInstance().createMetaSyncPoint(req);
+ if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ throw new DdlException("create_meta_sync_point failed: " +
resp.getStatus().getMsg());
+ }
+ if (!resp.hasCommittedVersion()) {
+ throw new DdlException("meta service response missing
committed_version");
+ }
+ if (!resp.hasVersionstamp() ||
Strings.isNullOrEmpty(resp.getVersionstamp())) {
+ throw new DdlException("meta service response missing
versionstamp");
+ }
+ return new MetaSyncPointVersion(resp.getCommittedVersion(),
resp.getVersionstamp());
+ } catch (RpcException e) {
+ throw new DdlException("create_meta_sync_point rpc failed: " +
e.getMessage());
+ }
+ }
+
+ private static class MetaSyncPointVersion {
+ private final long committedVersion;
+ private final String versionStamp;
+
+ MetaSyncPointVersion(long committedVersion, String versionStamp) {
+ this.committedVersion = committedVersion;
+ this.versionStamp = versionStamp;
+ }
+ }
+
+ private static File prepareTargetDir(String targetDir) throws IOException {
+ File dir = new File(targetDir).getCanonicalFile();
+ if (dir.exists()) {
+ if (!dir.isDirectory()) {
+ throw new IOException("target_dir exists but is not a
directory: " + dir.getAbsolutePath());
+ }
+ FileUtils.cleanDirectory(dir);
+ } else {
+ FileUtils.forceMkdir(dir);
+ }
+ return dir;
+ }
+
+ private BdbExportResult exportBdbJe(File targetDir, long imageVersion,
boolean hasImage) throws Exception {
+ if (!"bdb".equalsIgnoreCase(Config.edit_log_type)) {
+ throw new DdlException("only bdb edit_log_type supports bdbje
export");
+ }
+ Journal journal = Env.getCurrentEnv().getEditLog().getJournal();
+ if (!(journal instanceof BDBJEJournal)) {
+ throw new DdlException("current edit log is not BDBJEJournal");
+ }
+
+ BDBJEJournal bdbjeJournal = (BDBJEJournal) journal;
+ if (bdbjeJournal.getBDBEnvironment() == null) {
+ throw new DdlException("bdb environment is not initialized");
+ }
+ ReplicatedEnvironment replicatedEnvironment =
bdbjeJournal.getBDBEnvironment().getReplicatedEnvironment();
+ if (replicatedEnvironment == null) {
+ throw new DdlException("bdb replicated environment is not ready");
+ }
+
+ File bdbTargetDir = new File(targetDir, "bdb");
+ FileUtils.forceMkdir(bdbTargetDir);
+ File bdbSourceDir = new File(Env.getCurrentEnv().getBdbDir());
+
+ DbBackup backup = new DbBackup(replicatedEnvironment);
+ backup.startBackup();
+ try {
+ long journalUpperBound = bdbjeJournal.getMaxJournalId();
+ if (hasImage) {
+ long journalMinId = bdbjeJournal.getMinJournalId();
+ if (journalMinId > 0 && journalMinId > imageVersion + 1) {
+ throw new DdlException("export failed: bdb min journal id
" + journalMinId
+ + " is greater than image_version + 1 (" +
(imageVersion + 1) + ")");
+ }
+ if (journalUpperBound < imageVersion) {
+ throw new DdlException("export failed: bdb journal upper
bound " + journalUpperBound
+ + " is smaller than image_version " +
imageVersion);
+ }
+ }
+ String[] files = backup.getLogFilesInBackupSet();
+ for (String fileName : files) {
+ FileUtils.copyFile(new File(bdbSourceDir, fileName), new
File(bdbTargetDir, fileName));
+ }
+ return new BdbExportResult(files.length, journalUpperBound);
+ } finally {
+ backup.endBackup();
+ }
+ }
+
+ private CopiedImage copyLatestImageIfExists(File targetDir) throws
IOException {
+ File imageTargetDir = new File(targetDir, "image");
+ Storage storage = new Storage(Env.getServingEnv().getImageDir());
+ long imageVersion = storage.getLatestImageSeq();
+ File image = storage.getImageFile(imageVersion);
+ if (!image.exists()) {
+ return CopiedImage.notFound(imageVersion);
+ }
+ File targetImage = new File(imageTargetDir, image.getName());
+ linkOrCopyFile(image, targetImage);
+ return CopiedImage.found(targetImage, imageVersion);
+ }
+
+ private void copyImageMetaFiles(File targetDir) throws IOException {
+ File imageTargetDir = new File(targetDir, "image");
+ Storage storage = new Storage(Env.getServingEnv().getImageDir());
+ File[] metaFiles = new File[] {
+ storage.getModeFile(),
+ storage.getRoleFile(),
+ storage.getVersionFile()
+ };
+ for (File source : metaFiles) {
+ if (!source.exists()) {
+ continue;
+ }
+ linkOrCopyFile(source, new File(imageTargetDir, source.getName()));
+ }
+ }
+
+ private void linkOrCopyFile(File source, File target) throws IOException {
+ try {
+ Files.createLink(target.toPath(), source.toPath());
+ } catch (UnsupportedOperationException | SecurityException |
FileAlreadyExistsException e) {
+ FileUtils.copyFile(source, target);
+ } catch (IOException e) {
+ FileUtils.copyFile(source, target);
+ }
+ }
+
+ private static class CopiedImage {
+ private final File file;
+ private final long version;
+ private final boolean exists;
+
+ CopiedImage(File file, long version, boolean exists) {
+ this.file = file;
+ this.version = version;
+ this.exists = exists;
+ }
+
+ static CopiedImage found(File file, long version) {
+ return new CopiedImage(file, version, true);
+ }
+
+ static CopiedImage notFound(long version) {
+ return new CopiedImage(null, version, false);
+ }
+ }
+
+ private static class BdbExportResult {
+ private final int fileCount;
+ private final long journalUpperBound;
+
+ BdbExportResult(int fileCount, long journalUpperBound) {
+ this.fileCount = fileCount;
+ this.journalUpperBound = journalUpperBound;
+ }
+ }
+
+ public static class ExportMetaRequest {
+ @JsonAlias({"targetDir"})
+ @JsonProperty("target_dir")
+ private String targetDir;
+
+ public String getTargetDir() {
+ return targetDir;
+ }
+
+ public void setTargetDir(String targetDir) {
+ this.targetDir = targetDir;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 76b4578f892..8f5e3f17865 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSearchDesc;
import org.apache.doris.catalog.Resource;
import org.apache.doris.cloud.CloudWarmUpJob;
+import org.apache.doris.cloud.persist.CloudMetaSyncPoint;
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
import org.apache.doris.cloud.snapshot.SnapshotState;
import org.apache.doris.cluster.Cluster;
@@ -990,6 +991,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_META_SYNC_POINT: {
+ data = CloudMetaSyncPoint.read(in);
+ isRead = true;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index d506b474ed7..c7384c7e715 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -41,6 +41,7 @@ import org.apache.doris.catalog.FunctionSearchDesc;
import org.apache.doris.catalog.Resource;
import org.apache.doris.cloud.CloudWarmUpJob;
import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.persist.CloudMetaSyncPoint;
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
import org.apache.doris.cloud.snapshot.SnapshotState;
import org.apache.doris.common.Config;
@@ -1424,6 +1425,11 @@ public class EditLog {
// TODO: implement
break;
}
+ case OperationType.OP_META_SYNC_POINT: {
+ // CloudMetaSyncPoint info = (CloudMetaSyncPoint)
journal.getData();
+ // This log is only used to keep FE/MS cut point in
journal timeline.
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}, log id: {}", opCode,
logId, e);
@@ -2537,4 +2543,8 @@ public class EditLog {
public long logBeginSnapshot(SnapshotState snapshotState) {
return logEdit(OperationType.OP_BEGIN_SNAPSHOT, snapshotState);
}
+
+ public long logMetaSyncPoint(CloudMetaSyncPoint syncPoint) {
+ return logEdit(OperationType.OP_META_SYNC_POINT, syncPoint);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 1174d9c3874..016903129c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -426,6 +426,7 @@ public class OperationType {
public static final short OP_MODIFY_CLOUD_WARM_UP_JOB = 1002;
public static final short OP_BEGIN_SNAPSHOT = 1100;
+ public static final short OP_META_SYNC_POINT = 1101;
/**
* Get opcode name by op code.
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 6c14b3b1ee4..1f61fcab17d 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1113,6 +1113,17 @@ message GetCurrentMaxTxnResponse {
optional int64 current_max_txn_id = 2;
}
+message CreateMetaSyncPointRequest {
+ optional string cloud_unique_id = 1; // For auth
+ optional string request_ip = 2;
+}
+
+message CreateMetaSyncPointResponse {
+ optional MetaServiceResponseStatus status = 1;
+ optional int64 committed_version = 2;
+ optional string versionstamp = 3;
+}
+
message AbortTxnWithCoordinatorRequest {
optional string cloud_unique_id = 1; // For auth
optional string ip = 2;
@@ -2234,6 +2245,7 @@ service MetaService {
rpc abort_txn(AbortTxnRequest) returns (AbortTxnResponse);
rpc get_txn(GetTxnRequest) returns (GetTxnResponse);
rpc get_current_max_txn_id(GetCurrentMaxTxnRequest) returns
(GetCurrentMaxTxnResponse);
+ rpc create_meta_sync_point(CreateMetaSyncPointRequest) returns
(CreateMetaSyncPointResponse);
rpc check_txn_conflict(CheckTxnConflictRequest) returns
(CheckTxnConflictResponse);
rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse);
rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]