This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 24145a1c5af [fix](merge cloud) Fix cloud be set be tag map (#32864)
24145a1c5af is described below
commit 24145a1c5af59615a0005f2b00e3262b03e9ab60
Author: yujun <[email protected]>
AuthorDate: Wed Mar 27 14:54:16 2024 +0800
[fix](merge cloud) Fix cloud be set be tag map (#32864)
---
.../doris/cloud/catalog/CloudClusterChecker.java | 35 +++++++++++-----------
.../org/apache/doris/cloud/catalog/CloudEnv.java | 6 ++--
.../apache/doris/cloud/catalog/CloudReplica.java | 2 +-
.../apache/doris/cloud/load/CloudLoadManager.java | 5 ++--
.../doris/cloud/system/CloudSystemInfoService.java | 19 ++++++------
.../java/org/apache/doris/qe/StmtExecutor.java | 3 +-
.../regression/action/StreamLoadAction.groovy | 31 +++++++++++++++++--
7 files changed, 65 insertions(+), 36 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index cc4a3c09d9c..801f3166861 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -94,19 +94,13 @@ public class CloudClusterChecker extends MasterDaemon {
LOG.debug("begin to add clusterId: {}", addId);
}
// Attach tag to BEs
- Map<String, String> newTagMap =
Tag.DEFAULT_BACKEND_TAG.toMap();
String clusterName =
remoteClusterIdToPB.get(addId).getClusterName();
String clusterId =
remoteClusterIdToPB.get(addId).getClusterId();
String publicEndpoint =
remoteClusterIdToPB.get(addId).getPublicEndpoint();
String privateEndpoint =
remoteClusterIdToPB.get(addId).getPrivateEndpoint();
- newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
- newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
- newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
publicEndpoint);
- newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
privateEndpoint);
// For old versions that do no have status field set
ClusterStatus clusterStatus =
remoteClusterIdToPB.get(addId).hasClusterStatus()
? remoteClusterIdToPB.get(addId).getClusterStatus() :
ClusterStatus.NORMAL;
- newTagMap.put(Tag.CLOUD_CLUSTER_STATUS,
String.valueOf(clusterStatus));
MetricRepo.registerCloudMetrics(clusterId, clusterName);
//toAdd.forEach(i -> i.setTagMap(newTagMap));
List<Backend> toAdd = new ArrayList<>();
@@ -117,6 +111,12 @@ public class CloudClusterChecker extends MasterDaemon {
continue;
}
Backend b = new Backend(Env.getCurrentEnv().getNextId(),
addr, node.getHeartbeatPort());
+ Map<String, String> newTagMap =
Tag.DEFAULT_BACKEND_TAG.toMap();
+ newTagMap.put(Tag.CLOUD_CLUSTER_STATUS,
String.valueOf(clusterStatus));
+ newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
publicEndpoint);
+ newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
privateEndpoint);
newTagMap.put(Tag.CLOUD_UNIQUE_ID,
node.getCloudUniqueId());
b.setTagMap(newTagMap);
toAdd.add(b);
@@ -250,13 +250,6 @@ public class CloudClusterChecker extends MasterDaemon {
updateStatus(currentBes, expectedBes);
- // Attach tag to BEs
- Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
- newTagMap.put(Tag.CLOUD_CLUSTER_NAME,
remoteClusterIdToPB.get(cid).getClusterName());
- newTagMap.put(Tag.CLOUD_CLUSTER_ID,
remoteClusterIdToPB.get(cid).getClusterId());
- newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
remoteClusterIdToPB.get(cid).getPublicEndpoint());
- newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
remoteClusterIdToPB.get(cid).getPrivateEndpoint());
-
diffNodes(toAdd, toDel, () -> {
Map<String, Backend> currentMap = new HashMap<>();
for (Backend be : currentBes) {
@@ -280,6 +273,14 @@ public class CloudClusterChecker extends MasterDaemon {
if (node.hasIsSmoothUpgrade()) {
b.setSmoothUpgradeDst(node.getIsSmoothUpgrade());
}
+
+ // Attach tag to BEs
+ Map<String, String> newTagMap =
Tag.DEFAULT_BACKEND_TAG.toMap();
+ newTagMap.put(Tag.CLOUD_CLUSTER_NAME,
remoteClusterIdToPB.get(cid).getClusterName());
+ newTagMap.put(Tag.CLOUD_CLUSTER_ID,
remoteClusterIdToPB.get(cid).getClusterId());
+ newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
remoteClusterIdToPB.get(cid).getPublicEndpoint());
+ newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
+ remoteClusterIdToPB.get(cid).getPrivateEndpoint());
newTagMap.put(Tag.CLOUD_UNIQUE_ID,
node.getCloudUniqueId());
b.setTagMap(newTagMap);
nodeMap.put(endpoint, b);
@@ -350,8 +351,8 @@ public class CloudClusterChecker extends MasterDaemon {
}
private void getCloudObserverFes() {
- Cloud.GetClusterResponse response = CloudSystemInfoService
- .getCloudCluster(Config.cloud_sql_server_cluster_name,
Config.cloud_sql_server_cluster_id, "");
+ Cloud.GetClusterResponse response =
cloudSystemInfoService.getCloudCluster(
+ Config.cloud_sql_server_cluster_name,
Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| response.getStatus().getCode() != Cloud.MetaServiceCode.OK)
{
LOG.warn("failed to get cloud cluster due to incomplete response, "
@@ -416,7 +417,7 @@ public class CloudClusterChecker extends MasterDaemon {
return;
}
try {
- CloudSystemInfoService.updateFrontends(toAdd, toDel);
+ cloudSystemInfoService.updateFrontends(toAdd, toDel);
} catch (DdlException e) {
LOG.warn("update cloud frontends exception e: {}, msg: {}", e,
e.getMessage());
}
@@ -426,7 +427,7 @@ public class CloudClusterChecker extends MasterDaemon {
Map<String, List<Backend>> clusterIdToBackend =
cloudSystemInfoService.getCloudClusterIdToBackend();
//rpc to ms, to get mysql user can use cluster_id
// NOTE: rpc args all empty, use cluster_unique_id to get a instance's
all cluster info.
- Cloud.GetClusterResponse response =
CloudSystemInfoService.getCloudCluster("", "", "");
+ Cloud.GetClusterResponse response =
cloudSystemInfoService.getCloudCluster("", "", "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
&& response.getStatus().getCode() !=
MetaServiceCode.CLUSTER_NOT_FOUND)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 613fef3be68..7c37f1dbcff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -83,8 +83,8 @@ public class CloudEnv extends Env {
private Cloud.NodeInfoPB getLocalTypeFromMetaService() {
// get helperNodes from ms
- Cloud.GetClusterResponse response =
CloudSystemInfoService.getCloudCluster(
- Config.cloud_sql_server_cluster_name,
Config.cloud_sql_server_cluster_id, "");
+ Cloud.GetClusterResponse response = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCloudCluster(Config.cloud_sql_server_cluster_name,
Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| response.getStatus().getCode() != Cloud.MetaServiceCode.OK)
{
LOG.warn("failed to get cloud cluster due to incomplete response, "
@@ -392,7 +392,7 @@ public class CloudEnv extends Env {
public void changeCloudCluster(String clusterName, ConnectContext ctx)
throws DdlException {
checkCloudClusterPriv(clusterName);
- CloudSystemInfoService.waitForAutoStart(clusterName);
+ ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStart(clusterName);
try {
((CloudSystemInfoService)
Env.getCurrentSystemInfo()).addCloudCluster(clusterName, "");
} catch (UserException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index a51e332a784..aebc66128ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -163,7 +163,7 @@ public class CloudReplica extends Replica {
// if cluster is SUSPENDED, wait
try {
- CloudSystemInfoService.waitForAutoStart(cluster);
+ ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStart(cluster);
} catch (DdlException e) {
// this function cant throw exception. so just log it
LOG.warn("cant resume cluster {}", cluster);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
index 1d0bfc23f6c..5caa2108c59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.load;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
@@ -33,14 +34,14 @@ public class CloudLoadManager extends LoadManager {
@Override
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException,
UserException {
- CloudSystemInfoService.waitForAutoStartCurrentCluster();
+ ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();
return super.createLoadJobFromStmt(stmt);
}
@Override
public long createLoadJobFromStmt(InsertStmt stmt) throws DdlException {
- CloudSystemInfoService.waitForAutoStartCurrentCluster();
+ ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();
return super.createLoadJobFromStmt(stmt);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 7171fbafd16..59ba7340dfc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -89,7 +89,7 @@ public class CloudSystemInfoService extends SystemInfoService
{
* @param clusterId cluster id
* @return
*/
- public static Cloud.GetClusterResponse getCloudCluster(String clusterName,
String clusterId, String userName) {
+ public Cloud.GetClusterResponse getCloudCluster(String clusterName, String
clusterId, String userName) {
Cloud.GetClusterRequest.Builder builder =
Cloud.GetClusterRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id)
.setClusterName(clusterName).setClusterId(clusterId).setMysqlUserName(userName);
@@ -261,8 +261,8 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
- public static synchronized void updateFrontends(List<Frontend> toAdd,
- List<Frontend> toDel)
throws DdlException {
+ public synchronized void updateFrontends(List<Frontend> toAdd,
List<Frontend> toDel)
+ throws DdlException {
if (LOG.isDebugEnabled()) {
LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel);
}
@@ -570,7 +570,7 @@ public class CloudSystemInfoService extends
SystemInfoService {
this.instanceStatus = instanceStatus;
}
- public static void waitForAutoStartCurrentCluster() throws DdlException {
+ public void waitForAutoStartCurrentCluster() throws DdlException {
ConnectContext context = ConnectContext.get();
if (context != null) {
String cloudCluster = context.getCloudCluster();
@@ -580,7 +580,7 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
}
- public static String getClusterNameAutoStart(final String clusterName) {
+ public String getClusterNameAutoStart(final String clusterName) {
if (!Strings.isNullOrEmpty(clusterName)) {
return clusterName;
}
@@ -607,7 +607,7 @@ public class CloudSystemInfoService extends
SystemInfoService {
return cloudClusterTypeAndName.clusterName;
}
- public static void waitForAutoStart(String clusterName) throws
DdlException {
+ public void waitForAutoStart(String clusterName) throws DdlException {
if (Config.isNotCloudMode()) {
return;
}
@@ -616,7 +616,7 @@ public class CloudSystemInfoService extends
SystemInfoService {
LOG.warn("auto start in cloud mode, but clusterName empty {}",
clusterName);
return;
}
- String clusterStatus = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
+ String clusterStatus = getCloudStatusByName(clusterName);
if (Strings.isNullOrEmpty(clusterStatus)) {
// for cluster rename or cluster dropped
LOG.warn("cant find clusterStatus in fe, clusterName {}",
clusterName);
@@ -631,8 +631,7 @@ public class CloudSystemInfoService extends
SystemInfoService {
builder.setOp(Cloud.AlterClusterRequest.Operation.SET_CLUSTER_STATUS);
Cloud.ClusterPB.Builder clusterBuilder =
Cloud.ClusterPB.newBuilder();
- clusterBuilder.setClusterId(((CloudSystemInfoService)
-
Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName));
+ clusterBuilder.setClusterId(getCloudClusterIdByName(clusterName));
clusterBuilder.setClusterStatus(Cloud.ClusterStatus.TO_RESUME);
builder.setCluster(clusterBuilder);
@@ -671,7 +670,7 @@ public class CloudSystemInfoService extends
SystemInfoService {
} catch (InterruptedException e) {
LOG.info("change cluster sleep wait InterruptedException: ",
e);
}
- clusterStatus = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
+ clusterStatus = getCloudStatusByName(clusterName);
}
if (retryTime >= retryTimes) {
// auto start timeout
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 40af03d9237..783323b03fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -820,7 +820,8 @@ public class StmtExecutor {
deadCloudClusterStatus);
if
(Strings.isNullOrEmpty(deadCloudClusterStatus)
||
ClusterStatus.valueOf(deadCloudClusterStatus) != ClusterStatus.NORMAL) {
-
CloudSystemInfoService.waitForAutoStart(deadCloudClusterClusterName);
+ ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+
.waitForAutoStart(deadCloudClusterClusterName);
}
}
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index 56c80e88a40..606b9bc4ac8 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -54,6 +54,7 @@ class StreamLoadAction implements SuiteAction {
Map<String, String> headers
SuiteContext context
boolean directToBe = false
+ boolean twoPhaseCommit = false
StreamLoadAction(SuiteContext context) {
this.address = context.getFeHttpAddress()
@@ -137,6 +138,22 @@ class StreamLoadAction implements SuiteAction {
this.time = time.call()
}
+ void twoPhaseCommit(boolean twoPhaseCommit) {
+ this.twoPhaseCommit = twoPhaseCommit;
+ }
+
+ void twoPhaseCommit(Closure<Boolean> twoPhaseCommit) {
+ this.twoPhaseCommit = twoPhaseCommit.call();
+ }
+
+ // compatible with selectdb case
+ void isCloud(boolean isCloud) {
+ }
+
+ // compatible with selectdb case
+ void isCloud(Closure<Boolean> isCloud) {
+ }
+
void check(@ClosureParams(value = FromString, options =
["String,Throwable,Long,Long"]) Closure check) {
this.check = check
}
@@ -156,8 +173,14 @@ class StreamLoadAction implements SuiteAction {
long startTime = System.currentTimeMillis()
def isHttpStream = headers.containsKey("version")
try {
- def uri = isHttpStream ?
"http://${address.hostString}:${address.port}/api/_http_stream"
- :
"http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load"
+ def uri = ""
+ if (isHttpStream) {
+ uri =
"http://${address.hostString}:${address.port}/api/_http_stream"
+ } else if (twoPhaseCommit) {
+ uri =
"http://${address.hostString}:${address.port}/api/${db}/_stream_load_2pc"
+ } else {
+ uri =
"http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load"
+ }
HttpClients.createDefault().withCloseable { client ->
RequestBuilder requestBuilder =
prepareRequestHeader(RequestBuilder.put(uri))
HttpEntity httpEntity = prepareHttpEntity(client)
@@ -362,6 +385,10 @@ class StreamLoadAction implements SuiteAction {
def jsonSlurper = new JsonSlurper()
def parsed = jsonSlurper.parseText(responseText)
String status = parsed.Status
+ if (twoPhaseCommit) {
+ status = parsed.status
+ return status;
+ }
long txnId = parsed.TxnId
if (!status.equalsIgnoreCase("Publish Timeout")) {
return status;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]