This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9e3a75b7484 [feature](cloud) multi cloud cluster (#31749)
9e3a75b7484 is described below
commit 9e3a75b74847d867fce9991f04de4b190d747008
Author: yujun <[email protected]>
AuthorDate: Sun Mar 17 23:54:03 2024 +0800
[feature](cloud) multi cloud cluster (#31749)
---
be/src/io/fs/multi_table_pipe.cpp | 2 +
.../routine_load/routine_load_task_executor.cpp | 6 +
be/src/runtime/stream_load/stream_load_context.h | 4 +
.../java/org/apache/doris/analysis/LoadStmt.java | 2 +
.../apache/doris/analysis/NativeInsertStmt.java | 5 +-
.../apache/doris/analysis/SetUserPropertyVar.java | 11 ++
.../main/java/org/apache/doris/catalog/Env.java | 4 +-
.../java/org/apache/doris/catalog/EnvFactory.java | 23 +++
.../java/org/apache/doris/catalog/OlapTable.java | 19 +++
.../main/java/org/apache/doris/catalog/Table.java | 1 +
.../org/apache/doris/cloud/catalog/CloudEnv.java | 2 +-
.../doris/cloud/catalog/CloudEnvFactory.java | 30 ++++
.../doris/cloud/load/CloudBrokerLoadJob.java | 181 +++++++++++++++++++++
.../apache/doris/cloud/load/CloudLoadManager.java | 49 ++++++
.../doris/cloud/load/CloudRoutineLoadManager.java | 66 ++++++++
.../cloud/planner/CloudGroupCommitPlanner.java | 81 +++++++++
.../apache/doris/cloud/qe/CloudCoordinator.java | 4 +-
.../doris/cloud/system/CloudSystemInfoService.java | 43 ++++-
.../transaction/CloudGlobalTransactionMgr.java | 2 +-
.../java/org/apache/doris/common/ErrorCode.java | 5 +-
.../apache/doris/datasource/ExternalScanNode.java | 3 +-
.../doris/datasource/FederationBackendPolicy.java | 6 +-
.../org/apache/doris/httpv2/rest/LoadAction.java | 139 ++++++++++++++--
.../doris/httpv2/rest/manager/ClusterAction.java | 37 +++++
.../apache/doris/load/loadv2/BrokerLoadJob.java | 24 ++-
.../org/apache/doris/load/loadv2/JobState.java | 3 +-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 5 +-
.../apache/doris/load/loadv2/LoadLoadingTask.java | 1 +
.../org/apache/doris/load/loadv2/LoadTask.java | 23 +++
.../apache/doris/load/loadv2/MysqlLoadManager.java | 11 ++
.../doris/load/routineload/KafkaTaskInfo.java | 2 +
.../doris/load/routineload/RoutineLoadJob.java | 64 ++++++++
.../doris/load/routineload/RoutineLoadManager.java | 5 +-
.../org/apache/doris/mysql/privilege/Auth.java | 4 +-
.../plans/commands/insert/GroupCommitInserter.java | 4 +-
.../apache/doris/planner/GroupCommitPlanner.java | 59 ++++---
.../org/apache/doris/planner/OlapScanNode.java | 13 +-
.../org/apache/doris/planner/OlapTableSink.java | 12 +-
.../org/apache/doris/plugin/audit/AuditEvent.java | 7 +
.../java/org/apache/doris/qe/AuditLogHelper.java | 4 +
.../java/org/apache/doris/qe/ConnectContext.java | 24 ++-
.../main/java/org/apache/doris/qe/Coordinator.java | 5 +-
.../java/org/apache/doris/qe/MasterOpExecutor.java | 6 +
.../java/org/apache/doris/qe/SessionVariable.java | 14 ++
.../java/org/apache/doris/qe/ShowExecutor.java | 7 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 179 +++++++++++++++++++-
.../apache/doris/service/FrontendServiceImpl.java | 82 ++++++++++
.../main/java/org/apache/doris/system/Backend.java | 22 +++
.../org/apache/doris/system/BackendHbResponse.java | 15 +-
.../org/apache/doris/system/BeSelectionPolicy.java | 4 +-
.../java/org/apache/doris/system/HeartbeatMgr.java | 7 +-
.../org/apache/doris/system/SystemInfoService.java | 27 ++-
.../load/routineload/RoutineLoadManagerTest.java | 6 +-
.../load/routineload/RoutineLoadSchedulerTest.java | 4 +-
.../apache/doris/system/SystemInfoServiceTest.java | 2 +-
.../doris/utframe/DemoMultiBackendsTest.java | 3 +-
gensrc/thrift/BackendService.thrift | 2 +
.../cloud_p0/conf/regression-conf-custom.groovy | 15 +-
.../pipeline/p0/conf/regression-conf.groovy | 6 +-
regression-test/plugins/plugin_cluster.groovy | 180 ++++++++++++++++++++
60 files changed, 1476 insertions(+), 100 deletions(-)
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index 916f8151739..6b41bf6988b 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -165,6 +165,8 @@ Status MultiTablePipe::request_and_exec_plans() {
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
+ request.__set_user(_ctx->qualified_user);
+ request.__set_cloud_cluster(_ctx->cloud_cluster);
// no need to register new_load_stream_mgr coz it is already done in
routineload submit task
// plan this load
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index d22f2bb4a8c..3e5eb48afbc 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -216,6 +216,12 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
if (task.__isset.memtable_on_sink_node) {
ctx->memtable_on_sink_node = task.memtable_on_sink_node;
}
+ if (task.__isset.qualified_user) {
+ ctx->qualified_user = task.qualified_user;
+ }
+ if (task.__isset.cloud_cluster) {
+ ctx->cloud_cluster = task.cloud_cluster;
+ }
// set execute plan params (only for non-single-stream-multi-table load)
TStreamLoadPutResult put_result;
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index 376228e022d..1461e863d5d 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -236,6 +236,10 @@ public:
bool memtable_on_sink_node = false;
+ // use for cloud cluster mode
+ std::string qualified_user;
+ std::string cloud_cluster;
+
public:
ExecEnv* exec_env() { return _exec_env; }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 01a4490003b..19b72129516 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -125,6 +125,8 @@ public class LoadStmt extends DdlStmt {
public static final String KEY_COMMENT = "comment";
+ public static final String KEY_CLOUD_CLUSTER = "cloud_cluster";
+
public static final String KEY_ENCLOSE = "enclose";
public static final String KEY_ESCAPE = "escape";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 193ce3abc80..3f367464441 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MysqlTable;
@@ -1247,8 +1248,8 @@ public class NativeInsertStmt extends InsertStmt {
this.analyzer = analyzerTmp;
}
analyzeSubquery(analyzer, true);
- groupCommitPlanner = new GroupCommitPlanner((Database) db,
olapTable, targetColumnNames, queryId,
-
ConnectContext.get().getSessionVariable().getGroupCommit());
+ groupCommitPlanner =
EnvFactory.getInstance().createGroupCommitPlanner((Database) db, olapTable,
+ targetColumnNames, queryId,
ConnectContext.get().getSessionVariable().getGroupCommit());
// save plan message to be reused for prepare stmt
loadId = queryId;
baseSchemaVersion = olapTable.getBaseSchemaVersion();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java
index 37f3318a89e..a2f31818b8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java
@@ -18,7 +18,9 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -82,6 +84,15 @@ public class SetUserPropertyVar extends SetVar {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"GRANT");
}
+ if (Config.isCloudMode()) {
+ // check value, clusterName is valid.
+ if (key.equals(UserProperty.DEFAULT_CLOUD_CLUSTER)
+ && !Strings.isNullOrEmpty(value)
+ && !((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCloudClusterNames().contains(value)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value);
+ }
+ }
return;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index a2386b61392..1a93efa0c5a 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -637,7 +637,7 @@ public class Env {
public Env(boolean isCheckpointCatalog) {
this.catalogMgr = new CatalogMgr();
this.load = new Load();
- this.routineLoadManager = new RoutineLoadManager();
+ this.routineLoadManager =
EnvFactory.getInstance().createRoutineLoadManager();
this.groupCommitManager = new GroupCommitManager();
this.sqlBlockRuleMgr = new SqlBlockRuleMgr();
this.exportMgr = new ExportMgr();
@@ -720,7 +720,7 @@ public class Env {
Config.async_loading_load_task_pool_size, LoadTask.COMPARATOR,
LoadTask.class, !isCheckpointCatalog);
this.loadJobScheduler = new LoadJobScheduler();
- this.loadManager = new LoadManager(loadJobScheduler);
+ this.loadManager =
EnvFactory.getInstance().createLoadManager(loadJobScheduler);
this.progressManager = new ProgressManager();
this.streamLoadRecordMgr = new
StreamLoadRecordMgr("stream_load_record_manager",
Config.fetch_stream_load_record_interval_second * 1000L);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index a286cf33cea..34e9d5ef93b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -24,10 +24,15 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.cloud.catalog.CloudEnvFactory;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.loadv2.LoadJobScheduler;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
+import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
@@ -39,10 +44,15 @@ import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
+import org.apache.thrift.TException;
+
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
+// EnvFactory is responsed for create none-cloud object.
+// CloudEnvFactory is responsed for create cloud object.
+
public class EnvFactory {
public EnvFactory() {}
@@ -131,4 +141,17 @@ public class EnvFactory {
String timezone, boolean
loadZeroTolerance) {
return new Coordinator(jobId, queryId, descTable, fragments,
scanNodes, timezone, loadZeroTolerance);
}
+
+ public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable
table, List<String> targetColumnNames,
+ TUniqueId queryId, String groupCommit) throws UserException,
TException {
+ return new GroupCommitPlanner(db, table, targetColumnNames, queryId,
groupCommit);
+ }
+
+ public RoutineLoadManager createRoutineLoadManager() {
+ return new RoutineLoadManager();
+ }
+
+ public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler) {
+ return new LoadManager(loadJobScheduler);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 0130b78a9b9..463b3b1f19d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -476,6 +476,25 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
return null;
}
+ public List<Long> getAllTabletIds() {
+ List<Long> tabletIds = new ArrayList<>();
+ try {
+ rwLock.readLock().lock();
+ for (Partition partition : getPartitions()) {
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
+ tabletIds.addAll(index.getTablets().stream()
+ .map(tablet ->
tablet.getId())
+
.collect(Collectors.toList()));
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("get all tablet ids failed {}", e.getMessage());
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ return tabletIds;
+ }
+
public Map<Long, MaterializedIndexMeta> getVisibleIndexIdToMeta() {
Map<Long, MaterializedIndexMeta> visibleMVs = Maps.newHashMap();
List<MaterializedIndex> mvs = getVisibleIndex();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 0c73a19c7d5..35f5b14efc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -87,6 +87,7 @@ public abstract class Table extends MetaObject implements
Writable, TableIf {
// especially to reduce conflicts when obtaining delete bitmap update
locks for
// MoW table
protected ReentrantLock commitLock;
+
/*
* fullSchema and nameToColumn should contains all columns, both visible
and shadow.
* eg. for OlapTable, when doing schema change, there will be some shadow
columns which are not visible
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 00aef304cf3..19bd102c84c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -383,7 +383,7 @@ public class CloudEnv extends Env {
public void changeCloudCluster(String clusterName, ConnectContext ctx)
throws DdlException {
checkCloudClusterPriv(clusterName);
// TODO(merge-cloud): pick cloud auto start
- // waitForAutoStart(clusterName);
+ CloudSystemInfoService.waitForAutoStart(clusterName);
try {
((CloudSystemInfoService)
Env.getCurrentSystemInfo()).addCloudCluster(clusterName, "");
} catch (UserException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
index 8990f2cd966..c7fb81fa6c6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
@@ -21,9 +21,11 @@ import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DynamicPartitionProperty;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
@@ -31,14 +33,22 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.common.util.CloudPropertyAnalyzer;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.cloud.load.CloudBrokerLoadJob;
+import org.apache.doris.cloud.load.CloudLoadManager;
+import org.apache.doris.cloud.load.CloudRoutineLoadManager;
+import org.apache.doris.cloud.planner.CloudGroupCommitPlanner;
import org.apache.doris.cloud.qe.CloudCoordinator;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.loadv2.LoadJobScheduler;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
+import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
@@ -49,6 +59,8 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
+import org.apache.thrift.TException;
+
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
@@ -139,14 +151,32 @@ public class CloudEnvFactory extends EnvFactory {
return new CloudBrokerLoadJob();
}
+ @Override
public Coordinator createCoordinator(ConnectContext context, Analyzer
analyzer, Planner planner,
StatsErrorEstimator
statsErrorEstimator) {
return new CloudCoordinator(context, analyzer, planner,
statsErrorEstimator);
}
+ @Override
public Coordinator createCoordinator(Long jobId, TUniqueId queryId,
DescriptorTable descTable,
List<PlanFragment> fragments,
List<ScanNode> scanNodes,
String timezone, boolean
loadZeroTolerance) {
return new CloudCoordinator(jobId, queryId, descTable, fragments,
scanNodes, timezone, loadZeroTolerance);
}
+
+ @Override
+ public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable
table, List<String> targetColumnNames,
+ TUniqueId queryId, String groupCommit) throws UserException,
TException {
+ return new CloudGroupCommitPlanner(db, table, targetColumnNames,
queryId, groupCommit);
+ }
+
+ @Override
+ public RoutineLoadManager createRoutineLoadManager() {
+ return new CloudRoutineLoadManager();
+ }
+
+ @Override
+ public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler) {
+ return new CloudLoadManager(loadJobScheduler);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
index 52b52fd70ae..814e6020362 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
@@ -22,25 +22,39 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.loadv2.BrokerLoadJob;
import org.apache.doris.load.loadv2.BrokerPendingTaskAttachment;
+import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadLoadingTask;
+import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
public class CloudBrokerLoadJob extends BrokerLoadJob {
private static final Logger LOG =
LogManager.getLogger(CloudBrokerLoadJob.class);
@@ -48,6 +62,8 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
protected static final String CLOUD_CLUSTER_ID = "clusterId";
protected String cloudClusterId;
+ private int retryTimes = 3;
+
public CloudBrokerLoadJob() {
}
@@ -96,6 +112,13 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
}
}
+ // override BulkLoadJob.analyze
+ @Override
+ public void analyze() {
+ cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
+ super.analyze();
+ }
+
@Override
protected LoadLoadingTask createTask(Database db, OlapTable table,
List<BrokerFileGroup> brokerFileGroups,
boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey,
BrokerPendingTaskAttachment attachment)
@@ -127,4 +150,162 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
// in fe. So pint an edit log to save the status information of the
job here.
logFinalOperation();
}
+
+ @Override
+ protected void afterLoadingTaskCommitTransaction(List<Table> tableList) {
+ ConnectContext ctx = null;
+ if (ConnectContext.get() == null) {
+ ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
+ } else {
+ ctx = ConnectContext.get();
+ }
+
+ if (ctx.getSessionVariable().enableMultiClusterSyncLoad()) {
+ // get the backends of each cluster expect the load cluster
+ CloudSystemInfoService infoService = (CloudSystemInfoService)
Env.getCurrentSystemInfo();
+ List<List<Backend>> backendsList = infoService.getCloudClusterIds()
+ .stream()
+ .filter(id ->
!id.equals(cloudClusterId))
+ .map(id ->
infoService.getBackendsByClusterId(id))
+
.collect(Collectors.toList());
+ // for each all load table, get its tablets
+ tableList.forEach(table -> {
+ List<Long> allTabletIds = ((OlapTable)
table).getAllTabletIds();
+ StmtExecutor.syncLoadForTablets(backendsList, allTabletIds);
+ });
+ }
+ }
+
+ @Override
+ public void onTaskFailed(long taskId, FailMsg failMsg) {
+ if (Strings.isNullOrEmpty(this.cloudClusterId)) {
+ super.onTaskFailed(taskId, failMsg);
+ return;
+ }
+
+ try {
+ writeLock();
+ if (isTxnDone()) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("label", label)
+ .add("transactionId", transactionId)
+ .add("state", state)
+ .add("error_msg", "this task will be ignored when job
is: " + state)
+ .build());
+ return;
+ }
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("label", label)
+ .add("transactionId", transactionId)
+ .add("state", state)
+ .add("retryTimes", retryTimes)
+ .add("failMsg", failMsg.getMsg())
+ .build());
+
+ this.retryTimes--;
+ if (this.retryTimes <= 0) {
+ boolean abortTxn = this.transactionId > 0 ? true : false;
+ unprotectedExecuteCancel(failMsg, abortTxn);
+ logFinalOperation();
+ return;
+ } else {
+ unprotectedExecuteRetry(failMsg);
+ }
+ } finally {
+ writeUnlock();
+ }
+
+ boolean allTaskDone = false;
+ while (!allTaskDone) {
+ try {
+ writeLock();
+ // check if all task has been done
+ // unprotectedExecuteRetry() will cancel all running task
+ allTaskDone = true;
+ for (Map.Entry<Long, LoadTask> entry : idToTasks.entrySet()) {
+ if (entry.getKey() != taskId &&
!entry.getValue().isDone()) {
+ LOG.info("LoadTask({}) has not been done",
entry.getKey());
+ allTaskDone = false;
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ if (!allTaskDone) {
+ try {
+ Thread.sleep(1000);
+ continue;
+ } catch (InterruptedException e) {
+ LOG.warn("", e);
+ }
+ }
+ }
+
+ try {
+ writeLock();
+ this.state = JobState.PENDING;
+ this.idToTasks.clear();
+ this.failMsg = null;
+ this.finishedTaskIds.clear();
+
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
+ LoadTask task = createPendingTask();
+ // retry default backoff 60 seconds, because `be restart` is slow
+ task.setStartTimeMs(System.currentTimeMillis() + 60 * 1000);
+ idToTasks.put(task.getSignature(), task);
+ Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ protected void unprotectedExecuteRetry(FailMsg failMsg) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("transaction_id",
transactionId)
+ .add("error_msg", "Failed to execute load with error: " +
failMsg.getMsg()).build());
+
+ // get load ids of all loading tasks, we will cancel their coordinator
process later
+ List<TUniqueId> loadIds = Lists.newArrayList();
+ for (LoadTask loadTask : idToTasks.values()) {
+ if (loadTask instanceof LoadLoadingTask) {
+ loadIds.add(((LoadLoadingTask) loadTask).getLoadId());
+ }
+ }
+
+ // set failMsg and state
+ this.failMsg = failMsg;
+ if (failMsg.getCancelType() == CancelType.TXN_UNKNOWN) {
+ // for bug fix, see LoadManager's fixLoadJobMetaBugs() method
+ finishTimestamp = createTimestamp;
+ } else {
+ finishTimestamp = System.currentTimeMillis();
+ }
+
+ // remove callback before abortTransaction(), so that the
afterAborted() callback will not be called again
+
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
+ // abort txn by label, because transactionId here maybe -1
+ try {
+ LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("label", label)
+ .add("msg", "begin to abort txn")
+ .build());
+ Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, label,
failMsg.getMsg());
+ } catch (UserException e) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("label", label)
+ .add("error_msg", "failed to abort txn when job is
cancelled. " + e.getMessage())
+ .build());
+ }
+
+ // cancel all running coordinators, so that the scheduler's worker
thread will be released
+ for (TUniqueId loadId : loadIds) {
+ Coordinator coordinator =
QeProcessorImpl.INSTANCE.getCoordinator(loadId);
+ if (coordinator != null) {
+ coordinator.cancel();
+ }
+ }
+
+ // change state
+ state = JobState.RETRY;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
new file mode 100644
index 00000000000..1d0bfc23f6c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.load;
+
+import org.apache.doris.analysis.InsertStmt;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.LoadJobScheduler;
+import org.apache.doris.load.loadv2.LoadManager;
+
+public class CloudLoadManager extends LoadManager {
+
+ public CloudLoadManager(LoadJobScheduler loadJobScheduler) {
+ super(loadJobScheduler);
+ }
+
+ @Override
+ public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException,
UserException {
+ CloudSystemInfoService.waitForAutoStartCurrentCluster();
+
+ return super.createLoadJobFromStmt(stmt);
+ }
+
+ @Override
+ public long createLoadJobFromStmt(InsertStmt stmt) throws DdlException {
+ CloudSystemInfoService.waitForAutoStartCurrentCluster();
+
+ return super.createLoadJobFromStmt(stmt);
+ }
+
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
new file mode 100644
index 00000000000..fd10a3bc467
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.load;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.load.routineload.RoutineLoadManager;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CloudRoutineLoadManager extends RoutineLoadManager {
+ private static final Logger LOG =
LogManager.getLogger(CloudRoutineLoadManager.class);
+
+ @Override
+ public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String
dbName, String tableName)
+ throws UserException {
+ if (!Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster())) {
+
routineLoadJob.setCloudCluster(ConnectContext.get().getCloudCluster());
+ } else {
+ throw new UserException("cloud cluster is empty, please specify
cloud cluster");
+ }
+ super.addRoutineLoadJob(routineLoadJob, dbName, tableName);
+ }
+
+ @Override
+ protected List<Long> getAvailableBackendIds(long jobId) throws
LoadException {
+ RoutineLoadJob routineLoadJob = getJob(jobId);
+ String cloudClusterId = routineLoadJob.getCloudClusterId();
+ if (Strings.isNullOrEmpty(cloudClusterId)) {
+ LOG.warn("cluster id is empty");
+ throw new LoadException("cluster id is empty");
+ }
+
+ return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
+ .getBackendsByClusterId(cloudClusterId)
+ .stream()
+ .filter(Backend::isAlive)
+ .map(Backend::getId)
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
new file mode 100644
index 00000000000..8480055e8b9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.planner;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.GroupCommitPlanner;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CloudGroupCommitPlanner extends GroupCommitPlanner {
+ private static final Logger LOG =
LogManager.getLogger(CloudGroupCommitPlanner.class);
+
+ public CloudGroupCommitPlanner(Database db, OlapTable table, List<String>
targetColumnNames, TUniqueId queryId,
+ String groupCommit)
+ throws UserException, TException {
+ super(db, table, targetColumnNames, queryId, groupCommit);
+ }
+
+ @Override
+ protected void selectBackends(ConnectContext ctx) throws DdlException {
+ backend = ctx.getInsertGroupCommit(this.table.getId());
+ if (backend != null && backend.isAlive() && !backend.isDecommissioned()
+ &&
backend.getCloudClusterName().equals(ctx.getCloudCluster())) {
+ return;
+ }
+
+ String cluster = ctx.getCloudCluster();
+ if (Strings.isNullOrEmpty(cluster)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
+ }
+
+ // select be
+ List<Backend> backends = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster)
+ .values().stream().collect(Collectors.toList());
+ Collections.shuffle(backends);
+ for (Backend backend : backends) {
+ if (backend.isActive() && !backend.isDecommissioned()) {
+ this.backend = backend;
+ ctx.setInsertGroupCommit(this.table.getId(), backend);
+ LOG.debug("choose new be {}", backend.getId());
+ return;
+ }
+ }
+
+ throw new DdlException("No suitable backend for cloud cluster=" +
cluster);
+ }
+
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
index 02f1f906b3b..2ce8950c12f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
@@ -61,7 +61,7 @@ public class CloudCoordinator extends Coordinator {
}
@Override
- protected void prepare() throws Exception {
+ protected void prepare() throws UserException {
String cluster = null;
ConnectContext context = ConnectContext.get();
if (context != null) {
@@ -96,7 +96,7 @@ public class CloudCoordinator extends Coordinator {
LOG.warn("no available backends, idToBackend {}", idToBackend);
String clusterName = ConnectContext.get() != null
? ConnectContext.get().getCloudCluster() : "ctx empty cant
get clusterName";
- throw new Exception("no available backends, the cluster maybe not
be set or been dropped clusterName = "
+ throw new UserException("no available backends, the cluster maybe
not be set or been dropped clusterName = "
+ clusterName);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index ae43da4b244..6e7ca874f6e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.system;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.ClusterPB;
import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
@@ -30,6 +31,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
@@ -310,6 +312,33 @@ public class CloudSystemInfoService extends
SystemInfoService {
return clusterNameToId.containsKey(clusterName);
}
+ @Override
+ public List<Backend> getBackendsByCurrentCluster() throws UserException {
+ ConnectContext ctx = ConnectContext.get();
+ if (ctx == null) {
+ throw new UserException("connect context is null");
+ }
+
+ String cluster = ctx.getCurrentCloudCluster();
+ if (Strings.isNullOrEmpty(cluster)) {
+ throw new UserException("cluster name is empty");
+ }
+
+ ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
+
+ return getBackendsByClusterName(cluster);
+ }
+
+ @Override
+ public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster()
throws UserException {
+ List<Backend> backends = getBackendsByCurrentCluster();
+ Map<Long, Backend> idToBackend = Maps.newHashMap();
+ for (Backend be : backends) {
+ idToBackend.put(be.getId(), be);
+ }
+ return ImmutableMap.copyOf(idToBackend);
+ }
+
public List<Backend> getBackendsByClusterName(final String clusterName) {
String clusterId = clusterNameToId.getOrDefault(clusterName, "");
if (clusterId.isEmpty()) {
@@ -540,9 +569,19 @@ public class CloudSystemInfoService extends
SystemInfoService {
this.instanceStatus = instanceStatus;
}
+ public static void waitForAutoStartCurrentCluster() throws DdlException {
+ ConnectContext context = ConnectContext.get();
+ if (context != null) {
+ String cloudCluster = context.getCloudCluster();
+ if (!Strings.isNullOrEmpty(cloudCluster)) {
+ waitForAutoStart(cloudCluster);
+ }
+ }
+ }
+
public static void waitForAutoStart(final String clusterName) throws
DdlException {
- // TODO: merge from cloud.
- throw new DdlException("Env.waitForAutoStart unimplemented");
+ // TODO(merge-cloud): merge from cloud.
+ // throw new DdlException("Env.waitForAutoStart unimplemented");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index a48af1c4979..bf8cc592c20 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -990,7 +990,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public void updateMultiTableRunningTransactionTableIds(Long dbId, Long
transactionId, List<Long> tableIds)
throws UserException {
- throw new UserException(NOT_SUPPORTED_MSG);
+ //throw new UserException(NOT_SUPPORTED_MSG);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
index 1f9ff87647d..183c733097b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -1213,7 +1213,10 @@ public enum ErrorCode {
ERR_CLOUD_CLUSTER_ERROR(5098, new byte[]{'4', '2', '0', '0', '0'},
"Cluster %s not exist, use SQL 'SHOW CLUSTERS' to get a valid
cluster"),
- ERR_NO_CLUSTER_ERROR(5099, new byte[]{'4', '2', '0', '0', '0'}, "No
cluster selected");
+ ERR_NO_CLUSTER_ERROR(5099, new byte[]{'4', '2', '0', '0', '0'}, "No
cluster selected"),
+
+ ERR_NOT_CLOUD_MODE(6000, new byte[]{'4', '2', '0', '0', '0'},
+ "Command only support in cloud mode.");
// This is error code
private final int code;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index aa616372184..d41fab5916c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -46,7 +46,8 @@ public abstract class ExternalScanNode extends ScanNode {
protected final FederationBackendPolicy backendPolicy =
(ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableFileCache)
- ? new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new
FederationBackendPolicy();
+ ? new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
+ : new FederationBackendPolicy();
public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 73a49bb24a8..e3e3405a1b1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -71,7 +71,7 @@ import java.util.stream.Collectors;
public class FederationBackendPolicy {
private static final Logger LOG =
LogManager.getLogger(FederationBackendPolicy.class);
- private final List<Backend> backends = Lists.newArrayList();
+ protected final List<Backend> backends = Lists.newArrayList();
private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
public Map<Backend, Long> getAssignedWeightPerBackend() {
@@ -80,7 +80,7 @@ public class FederationBackendPolicy {
private Map<Backend, Long> assignedWeightPerBackend = Maps.newHashMap();
- private ConsistentHash<Split, Backend> consistentHash;
+ protected ConsistentHash<Split, Backend> consistentHash;
private int nextBe = 0;
private boolean initialized = false;
@@ -184,7 +184,7 @@ public class FederationBackendPolicy {
}
public void init(BeSelectionPolicy policy) throws UserException {
-
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values()));
+
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getBackendsByCurrentCluster()));
if (backends.isEmpty()) {
throw new UserException("No available backends");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 6be5654a2ea..2b37cf997d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -20,9 +20,12 @@ package org.apache.doris.httpv2.rest;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
@@ -30,6 +33,7 @@ import
org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
@@ -39,6 +43,7 @@ import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@@ -50,8 +55,11 @@ import
org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
import java.net.URI;
+import java.security.SecureRandom;
import java.util.List;
+import java.util.Random;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -149,8 +157,7 @@ public class LoadAction extends RestBaseController {
}
String label = request.getHeader(LABEL_KEY);
- TNetworkAddress redirectAddr;
- redirectAddr = selectRedirectBackend(groupCommit);
+ TNetworkAddress redirectAddr = selectRedirectBackend(request,
groupCommit);
LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);
@@ -228,6 +235,7 @@ public class LoadAction extends RestBaseController {
// we return error by using RestBaseResult.
private Object executeWithoutPassword(HttpServletRequest request,
HttpServletResponse response, String db, String table, boolean
isStreamLoad, boolean groupCommit) {
+ String label = null;
try {
String dbName = db;
String tableName = table;
@@ -246,11 +254,7 @@ public class LoadAction extends RestBaseController {
String fullDbName = dbName;
- String label = request.getParameter(LABEL_KEY);
- if (isStreamLoad) {
- label = request.getHeader(LABEL_KEY);
- }
-
+ label = isStreamLoad ? request.getHeader(LABEL_KEY) :
request.getParameter(LABEL_KEY);
if (!isStreamLoad && Strings.isNullOrEmpty(label)) {
// for stream load, the label can be generated by system
automatically
return new RestBaseResult("No label selected.");
@@ -273,7 +277,7 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult(e.getMessage());
}
} else {
- redirectAddr = selectRedirectBackend(groupCommit);
+ redirectAddr = selectRedirectBackend(request, groupCommit);
}
LOG.info("redirect load action to destination={}, stream: {}, db:
{}, tbl: {}, label: {}",
@@ -282,6 +286,8 @@ public class LoadAction extends RestBaseController {
RedirectView redirectView = redirectTo(request, redirectAddr);
return redirectView;
} catch (Exception e) {
+ LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {},
err: {}",
+ isStreamLoad, db, table, label, e.getMessage());
return new RestBaseResult(e.getMessage());
}
}
@@ -304,7 +310,7 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult("No transaction operation(\'commit\'
or \'abort\') selected.");
}
- TNetworkAddress redirectAddr = selectRedirectBackend(false);
+ TNetworkAddress redirectAddr = selectRedirectBackend(request,
false);
LOG.info("redirect stream load 2PC action to destination={}, db:
{}, txn: {}, operation: {}",
redirectAddr.toString(), dbName,
request.getHeader(TXN_ID_KEY), txnOperation);
@@ -322,7 +328,35 @@ public class LoadAction extends RestBaseController {
return index;
}
- private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws
LoadException {
+ private String getCloudClusterName(HttpServletRequest request) {
+ String cloudClusterName =
request.getHeader(SessionVariable.CLOUD_CLUSTER);
+ if (!Strings.isNullOrEmpty(cloudClusterName)) {
+ return cloudClusterName;
+ }
+
+ cloudClusterName = ConnectContext.get().getCloudCluster();
+ if (!Strings.isNullOrEmpty(cloudClusterName)) {
+ return cloudClusterName;
+ }
+
+ return "";
+ }
+
+ private TNetworkAddress selectRedirectBackend(HttpServletRequest request,
boolean groupCommit)
+ throws LoadException {
+ if (Config.isCloudMode()) {
+ String cloudClusterName = getCloudClusterName(request);
+ if (Strings.isNullOrEmpty(cloudClusterName)) {
+ throw new LoadException("No cloud cluster name selected.");
+ }
+ String reqHostStr =
request.getHeader(HttpHeaderNames.HOST.toString());
+ return selectCloudRedirectBackend(cloudClusterName, reqHostStr,
groupCommit);
+ } else {
+ return selectLocalRedirectBackend(groupCommit);
+ }
+ }
+
+ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit)
throws LoadException {
Backend backend = null;
BeSelectionPolicy policy = null;
String qualifiedUser = ConnectContext.get().getQualifiedUser();
@@ -358,6 +392,89 @@ public class LoadAction extends RestBaseController {
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
+ private TNetworkAddress selectCloudRedirectBackend(String clusterName,
String reqHostStr, boolean groupCommit)
+ throws LoadException {
+ List<Backend> backends = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getBackendsByClusterName(clusterName)
+ .stream().filter(be -> be.isAlive() && (!groupCommit ||
groupCommit && !be.isDecommissioned()))
+ .collect(Collectors.toList());
+
+ if (backends.isEmpty()) {
+ LOG.warn("No available backend for stream load redirect, cluster
name {}", clusterName);
+ throw new
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", cluster: " +
clusterName);
+ }
+
+ Random rand = new SecureRandom();
+ int randomIndex = rand.nextInt(backends.size());
+ Backend backend = backends.get(randomIndex);
+
+ Pair<String, Integer> publicHostPort = null;
+ Pair<String, Integer> privateHostPort = null;
+ try {
+ if (!Strings.isNullOrEmpty(backend.getCloudPublicEndpoint())) {
+ publicHostPort =
splitHostAndPort(backend.getCloudPublicEndpoint());
+ }
+ } catch (AnalysisException e) {
+ throw new LoadException(e.getMessage());
+ }
+
+ try {
+ if (!Strings.isNullOrEmpty(backend.getCloudPrivateEndpoint())) {
+ privateHostPort =
splitHostAndPort(backend.getCloudPrivateEndpoint());
+ }
+ } catch (AnalysisException e) {
+ throw new LoadException(e.getMessage());
+ }
+
+ reqHostStr = reqHostStr.replaceAll("\\s+", "");
+ if (reqHostStr.isEmpty()) {
+ LOG.info("Invalid header host: {}", reqHostStr);
+ throw new LoadException("Invalid header host: " + reqHostStr);
+ }
+
+ String reqHost = "";
+ String[] pair = reqHostStr.split(":");
+ if (pair.length == 1) {
+ reqHost = pair[0];
+ } else if (pair.length == 2) {
+ reqHost = pair[0];
+ } else {
+ LOG.info("Invalid header host: {}", reqHostStr);
+ throw new LoadException("Invalid header host: " + reqHost);
+ }
+
+ if (InetAddressValidator.getInstance().isValid(reqHost)
+ && publicHostPort != null && reqHost == publicHostPort.first) {
+ return new TNetworkAddress(publicHostPort.first,
publicHostPort.second);
+ } else if (privateHostPort != null) {
+ return new TNetworkAddress(reqHost, privateHostPort.second);
+ } else {
+ return new TNetworkAddress(backend.getHost(),
backend.getHttpPort());
+ }
+ }
+
+ private Pair<String, Integer> splitHostAndPort(String hostPort) throws
AnalysisException {
+ hostPort = hostPort.replaceAll("\\s+", "");
+ if (hostPort.isEmpty()) {
+ LOG.info("empty endpoint");
+ throw new AnalysisException("empty endpoint: " + hostPort);
+ }
+
+ String[] pair = hostPort.split(":");
+ if (pair.length != 2) {
+ LOG.info("Invalid endpoint: {}", hostPort);
+ throw new AnalysisException("Invalid endpoint: " + hostPort);
+ }
+
+ int port = Integer.parseInt(pair[1]);
+ if (port <= 0 || port >= 65536) {
+ LOG.info("Invalid endpoint port: {}", pair[1]);
+ throw new AnalysisException("Invalid endpoint port: " + pair[1]);
+ }
+
+ return Pair.of(pair[0], port);
+ }
+
// NOTE: This function can only be used for AuditlogPlugin stream load for
now.
// AuditlogPlugin should be re-disigned carefully, and blow method focuses
on
// temporarily addressing the users' needs for audit logs.
@@ -410,7 +527,7 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult("No label selected.");
}
- TNetworkAddress redirectAddr = selectRedirectBackend(false);
+ TNetworkAddress redirectAddr = selectRedirectBackend(request,
false);
LOG.info("Redirect load action with auth token to destination={},"
+ "stream: {}, db: {}, tbl: {}, label: {}",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java
index ee09946f4fc..a5c915e0bbc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java
@@ -18,6 +18,7 @@
package org.apache.doris.httpv2.rest.manager;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
@@ -73,4 +74,40 @@ public class ClusterAction extends RestBaseController {
.collect(Collectors.toList()));
return ResponseEntityBuilder.ok(result);
}
+
+ public static class BeClusterInfo {
+ public volatile String host;
+ public volatile int heartbeatPort;
+ public volatile int bePort;
+ public volatile int httpPort;
+ public volatile int brpcPort;
+ public volatile long currentFragmentNum = 0;
+ public volatile long lastFragmentUpdateTime = 0;
+ }
+
+ @RequestMapping(path = "/cluster_info/cloud_cluster_status", method =
RequestMethod.GET)
+ public Object cloudClusterInfo(HttpServletRequest request,
HttpServletResponse response) {
+ executeCheckPassword(request, response);
+ checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
+
+ // Key: cluster_name Value: be status
+ Map<String, List<BeClusterInfo>> result = Maps.newHashMap();
+
+ ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterIdToBackend()
+ .forEach((clusterId, backends) -> {
+ List<BeClusterInfo> bis = backends.stream().map(backend ->
{
+ BeClusterInfo bi = new BeClusterInfo();
+ bi.host = backend.getHost();
+ bi.heartbeatPort = backend.getHeartbeatPort();
+ bi.bePort = backend.getBePort();
+ bi.httpPort = backend.getHttpPort();
+ bi.brpcPort = backend.getBrpcPort();
+ bi.currentFragmentNum =
backend.getBackendStatus().currentFragmentNum;
+ bi.lastFragmentUpdateTime =
backend.getBackendStatus().lastFragmentUpdateTime;
+ return bi; }).collect(Collectors.toList());
+ result.put(clusterId, bis);
+ });
+
+ return ResponseEntityBuilder.ok(result);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index abf0500b90e..52007d86239 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -118,12 +118,15 @@ public class BrokerLoadJob extends BulkLoadJob {
@Override
protected void unprotectedExecuteJob() {
- LoadTask task = new BrokerLoadPendingTask(this,
fileGroupAggInfo.getAggKeyToFileGroups(),
- brokerDesc, getPriority());
+ LoadTask task = createPendingTask();
idToTasks.put(task.getSignature(), task);
Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
}
+ protected LoadTask createPendingTask() {
+ return new BrokerLoadPendingTask(this,
fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc, getPriority());
+ }
+
/**
* Situation1: When attachment is instance of BrokerPendingTaskAttachment,
* this method is called by broker pending task.
@@ -312,7 +315,11 @@ public class BrokerLoadJob extends BulkLoadJob {
try {
db = getDb();
tableList =
db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(fileGroupAggInfo.getAllTableIds()));
- MetaLockUtils.writeLockTablesOrMetaException(tableList);
+ if (Config.isCloudMode()) {
+ MetaLockUtils.commitLockTables(tableList);
+ } else {
+ MetaLockUtils.writeLockTablesOrMetaException(tableList);
+ }
} catch (MetaNotFoundException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
@@ -330,6 +337,7 @@ public class BrokerLoadJob extends BulkLoadJob {
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress,
loadStartTimestamp,
finishTimestamp, state, failMsg));
+ afterLoadingTaskCommitTransaction(tableList);
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
@@ -337,10 +345,18 @@ public class BrokerLoadJob extends BulkLoadJob {
.build(), e);
cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
} finally {
- MetaLockUtils.writeUnlockTables(tableList);
+ if (Config.isCloudMode()) {
+ MetaLockUtils.commitUnlockTables(tableList);
+ } else {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
}
}
+ // cloud override
+ protected void afterLoadingTaskCommitTransaction(List<Table> tableList) {
+ }
+
private void writeProfile() {
if (!enableProfile) {
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
index e023b686c0c..10151694947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
@@ -25,7 +25,8 @@ public enum JobState {
LOADING, // job is running
COMMITTED, // transaction is committed but not visible
FINISHED, // transaction is visible and job is finished
- CANCELLED; // transaction is aborted and job is cancelled
+ CANCELLED, // transaction is aborted and job is cancelled
+ RETRY;
public boolean isFinalState() {
return this == FINISHED || this == CANCELLED;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index c15904be8ef..55b61b1e981 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -250,9 +250,10 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
*/
abstract Set<String> getTableNames() throws MetaNotFoundException;
- // return true if the corresponding transaction is done(COMMITTED,
FINISHED, CANCELLED)
+ // return true if the corresponding transaction is done(COMMITTED,
FINISHED, CANCELLED, RETRY)
public boolean isTxnDone() {
- return state == JobState.COMMITTED || state == JobState.FINISHED ||
state == JobState.CANCELLED;
+ return state == JobState.COMMITTED || state == JobState.FINISHED
+ || state == JobState.CANCELLED || state == JobState.RETRY;
}
// return true if job is done(FINISHED/CANCELLED/UNKNOWN)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index eef73542ed4..8c56547ff8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -130,6 +130,7 @@ public class LoadLoadingTask extends LoadTask {
protected void executeTask() throws Exception {
LOG.info("begin to execute loading task. load id: {} job id: {}. db:
{}, tbl: {}. left retry: {}",
DebugUtil.printId(loadId), callback.getCallbackId(),
db.getFullName(), table.getName(), retryTime);
+
retryTime--;
beginTime = System.currentTimeMillis();
if (!((BrokerLoadJob) callback).updateState(JobState.LOADING)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
index d6789805e67..06991d5fb52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
@@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
@@ -69,6 +70,8 @@ public abstract class LoadTask extends MasterTask {
protected TaskAttachment attachment;
protected FailMsg failMsg = new FailMsg();
protected int retryTime = 1;
+ private volatile boolean done = false;
+ protected long startTimeMs = 0;
protected final Priority priority;
public LoadTask(LoadTaskCallback callback, TaskType taskType, Priority
priority) {
@@ -82,6 +85,17 @@ public abstract class LoadTask extends MasterTask {
protected void exec() {
boolean isFinished = false;
try {
+ if (Config.isCloudMode()) {
+ while (startTimeMs > System.currentTimeMillis()) {
+ try {
+ Thread.sleep(1000);
+ LOG.info("LoadTask:{} backoff startTimeMs:{} now:{}",
+ signature, startTimeMs,
System.currentTimeMillis());
+ } catch (InterruptedException e) {
+ LOG.info("ignore InterruptedException: ", e);
+ }
+ }
+ }
// execute pending task
executeTask();
// callback on pending task finished
@@ -100,6 +114,7 @@ public abstract class LoadTask extends MasterTask {
// callback on pending task failed
callback.onTaskFailed(signature, failMsg);
}
+ done = true;
}
}
@@ -131,6 +146,14 @@ public abstract class LoadTask extends MasterTask {
return taskType;
}
+ public boolean isDone() {
+ return done;
+ }
+
+ public void setStartTimeMs(long startTimeMs) {
+ this.startTimeMs = startTimeMs;
+ }
+
public int getPriorityValue() {
return this.priority.value;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index 5c46ed5d862..adbfc27fca2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -41,6 +41,7 @@ import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
import com.google.common.collect.EvictingQueue;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -428,6 +429,16 @@ public class MysqlLoadManager {
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_PARTITIONS, pNames);
}
}
+
+ // cloud cluster
+ if (Config.isCloudMode()) {
+ String clusterName = ConnectContext.get().getCloudCluster();
+ if (Strings.isNullOrEmpty(clusterName)) {
+ throw new LoadException("cloud cluster is empty");
+ }
+ httpPut.addHeader(LoadStmt.KEY_CLOUD_CLUSTER, clusterName);
+ }
+
httpPut.setEntity(entity);
return httpPut;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index d8b79d9bdce..477dc02955f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -110,6 +110,8 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN);
}
tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode());
+ tRoutineLoadTask.setQualifiedUser(routineLoadJob.getQualifiedUser());
+ tRoutineLoadTask.setCloudCluster(routineLoadJob.getCloudCluster());
return tRoutineLoadTask;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index f9be2014e30..b0911f00634 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -262,10 +263,16 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
protected boolean isTypeRead = false;
+ private String cloudClusterId;
+
protected byte enclose = 0;
protected byte escape = 0;
+ // use for cloud cluster mode
+ protected String qualifiedUser;
+ protected String cloudCluster;
+
public void setTypeRead(boolean isTypeRead) {
this.isTypeRead = isTypeRead;
}
@@ -314,6 +321,8 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE,
Long.toString(var.getSqlMode()));
this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+ this.qualifiedUser = ConnectContext.get().getQualifiedUser();
+ this.cloudCluster = ConnectContext.get().getCloudCluster();
} else {
sessionVariables.put(SessionVariable.SQL_MODE,
String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
@@ -720,6 +729,14 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
this.comment = comment;
}
+ public String getQualifiedUser() {
+ return qualifiedUser;
+ }
+
+ public String getCloudCluster() {
+ return cloudCluster;
+ }
+
public int getSizeOfRoutineLoadTaskInfoList() {
readLock();
try {
@@ -916,8 +933,28 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
Preconditions.checkNotNull(planner);
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
Table table = db.getTableOrMetaException(tableId,
Table.TableType.OLAP);
+ boolean needCleanCtx = false;
table.readLock();
try {
+ if (Config.isCloudMode()) {
+ String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getClusterNameByClusterId(cloudClusterId);
+ if (Strings.isNullOrEmpty(clusterName)) {
+ String err = String.format("cluster name is empty, cluster
id is %s", cloudClusterId);
+ LOG.warn(err);
+ throw new UserException(err);
+ }
+
+ if (ConnectContext.get() == null) {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
+ ctx.setCloudCluster(clusterName);
+ needCleanCtx = true;
+ } else {
+ ConnectContext.get().setCloudCluster(clusterName);
+ }
+ }
+
TExecPlanFragmentParams planParams = planner.plan(loadId);
// add table indexes to transaction state
TransactionState txnState =
Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId);
@@ -931,6 +968,9 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
return planParams;
} finally {
+ if (needCleanCtx) {
+ ConnectContext.remove();
+ }
table.readUnlock();
}
}
@@ -1486,6 +1526,24 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
this.origStmt = origStmt;
}
+ public void setCloudCluster(String cloudClusterName) throws UserException {
+ if (Strings.isNullOrEmpty(cloudClusterName)) {
+ LOG.warn("cluster name is empty");
+ throw new UserException("cluster name is empty");
+ }
+
+ this.cloudClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCloudClusterIdByName(cloudClusterName);
+ if (Strings.isNullOrEmpty(this.cloudClusterId)) {
+ LOG.warn("cluster id is empty, cluster name {}", cloudClusterName);
+ throw new UserException("cluster id is empty, cluster name: " +
cloudClusterName);
+ }
+ }
+
+ public String getCloudClusterId() {
+ return cloudClusterId;
+ }
+
// check the correctness of commit info
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment,
TransactionState txnState,
@@ -1797,6 +1855,9 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
out.writeBoolean(true);
userIdentity.write(out);
}
+ if (Config.isCloudMode()) {
+ Text.writeString(out, cloudClusterId);
+ }
Text.writeString(out, comment);
}
@@ -1889,6 +1950,9 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
userIdentity = UserIdentity.UNKNOWN;
}
}
+ if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_123 &&
Config.isCloudMode()) {
+ cloudClusterId = Text.readString(in);
+ }
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_117) {
comment = Text.readString(in);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 13a86f5d173..522d320e912 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -153,6 +153,7 @@ public class RoutineLoadManager implements Writable {
}
+ // cloud override
public void createRoutineLoadJob(CreateRoutineLoadStmt
createRoutineLoadStmt)
throws UserException {
// check load auth
@@ -184,7 +185,7 @@ public class RoutineLoadManager implements Writable {
}
public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String
dbName, String tableName)
- throws DdlException {
+ throws UserException {
writeLock();
try {
// check if db.routineLoadName has been used
@@ -542,7 +543,7 @@ public class RoutineLoadManager implements Writable {
* @return
* @throws LoadException
*/
- private List<Long> getAvailableBackendIds(long jobId) throws LoadException
{
+ protected List<Long> getAvailableBackendIds(long jobId) throws
LoadException {
RoutineLoadJob job = getJob(jobId);
if (job == null) {
throw new LoadException("job " + jobId + " does not exist");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
index f84e8bb4c90..43f93df71e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
@@ -1818,7 +1818,7 @@ public class Auth implements Writable {
return sb.toString();
}
- // ====== CLOUD ======
+ // ====== BEGIN CLOUD ======
public List<String> getCloudClusterUsers(String clusterName) {
return propertyMgr.getCloudClusterUsers(userManager.getAllUsers(),
clusterName);
}
@@ -1844,5 +1844,5 @@ public class Auth implements Writable {
}
return cluster;
}
- // ====== CLOUD ======
+ // ====== END CLOUD ======
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
index 4926d5486dc..655a37c7554 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@@ -111,7 +112,8 @@ public class GroupCommitInserter {
for (List<Expr> list : materializedConstExprLists) {
rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
}
- GroupCommitPlanner groupCommitPlanner = new
GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
+ GroupCommitPlanner groupCommitPlanner =
EnvFactory.getInstance().createGroupCommitPlanner(
+ physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), null, ctx.queryId(),
ConnectContext.get().getSessionVariable().getGroupCommit());
PGroupCommitInsertResponse response =
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index c8e7ef6c4e2..e3c4bf0ecec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -72,10 +72,10 @@ import java.util.stream.Collectors;
public class GroupCommitPlanner {
private static final Logger LOG =
LogManager.getLogger(GroupCommitPlanner.class);
- private Database db;
- private OlapTable table;
- private TUniqueId loadId;
- private Backend backend;
+ protected Database db;
+ protected OlapTable table;
+ protected TUniqueId loadId;
+ protected Backend backend;
private TExecPlanFragmentParamsList paramsList;
private ByteString execPlanFragmentParamsBytes;
@@ -131,29 +131,8 @@ public class GroupCommitPlanner {
public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext
ctx,
List<InternalService.PDataRow> rows)
throws DdlException, RpcException, ExecutionException,
InterruptedException {
- backend = ctx.getInsertGroupCommit(this.table.getId());
- if (backend == null || !backend.isAlive() ||
backend.isDecommissioned()) {
- List<Long> allBackendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (allBackendIds.isEmpty()) {
- throw new DdlException("No alive backend");
- }
- Collections.shuffle(allBackendIds);
- boolean find = false;
- for (Long beId : allBackendIds) {
- backend = Env.getCurrentSystemInfo().getBackend(beId);
- if (!backend.isDecommissioned()) {
- ctx.setInsertGroupCommit(this.table.getId(), backend);
- find = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("choose new be {}", backend.getId());
- }
- break;
- }
- }
- if (!find) {
- throw new DdlException("No suitable backend");
- }
- }
+ selectBackends(ctx);
+
PGroupCommitInsertRequest request =
PGroupCommitInsertRequest.newBuilder()
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
.setRequest(execPlanFragmentParamsBytes)
@@ -166,6 +145,32 @@ public class GroupCommitPlanner {
return future.get();
}
+ // cloud override
+ protected void selectBackends(ConnectContext ctx) throws DdlException {
+ backend = ctx.getInsertGroupCommit(this.table.getId());
+ if (backend != null && backend.isAlive() &&
!backend.isDecommissioned()) {
+ return;
+ }
+
+ List<Long> allBackendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
+ if (allBackendIds.isEmpty()) {
+ throw new DdlException("No alive backend");
+ }
+ Collections.shuffle(allBackendIds);
+ for (Long beId : allBackendIds) {
+ backend = Env.getCurrentSystemInfo().getBackend(beId);
+ if (!backend.isDecommissioned()) {
+ ctx.setInsertGroupCommit(this.table.getId(), backend);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("choose new be {}", backend.getId());
+ }
+ return;
+ }
+ }
+
+ throw new DdlException("No suitable backend");
+ }
+
// only for nereids use
public static InternalService.PDataRow getRowStringValue(List<Expr> cols,
int filterSize) throws UserException {
if (cols.isEmpty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 29243a4abeb..7ee563b6783 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -894,12 +894,15 @@ public class OlapScanNode extends ScanNode {
LOG.debug("backend {} not exists or is not alive for
replica {}", replica.getBackendId(),
replica.getId());
}
- errs.add("replica " + replica.getId() + "'s backend " +
replica.getBackendId()
- + " does not exist or not alive");
- errs.add(" or you may not have permission to access the
current cluster");
- if (ConnectContext.get() != null && Config.isCloudMode()) {
- errs.add("clusterName=" +
ConnectContext.get().getCloudCluster());
+ String err = "replica " + replica.getId() + "'s backend "
+ replica.getBackendId()
+ + " does not exist or not alive";
+ if (Config.isCloudMode()) {
+ err += ", or you may not have permission to access the
current cluster";
+ if (ConnectContext.get() != null) {
+ err += " clusterName=" +
ConnectContext.get().getCloudCluster();
+ }
}
+ errs.add(err);
continue;
}
if (!backend.isMixNode()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index d8c27dd32e5..19c6e7e1d5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -518,11 +518,13 @@ public class OlapTableSink extends DataSink {
Multimap<Long, Long> bePathsMap =
tablet.getNormalReplicaBackendPathMap();
if (bePathsMap.keySet().size() < loadRequiredReplicaNum) {
String errMsg = "tablet " + tablet.getId() + " alive
replica num " + bePathsMap.keySet().size()
- + " < quorum replica num " +
loadRequiredReplicaNum
- + ", alive backends: [" +
StringUtils.join(bePathsMap.keySet(), ",") + "]";
- errMsg += " or you may not have permission to access
the current cluster";
- if (ConnectContext.get() != null &&
Config.isCloudMode()) {
- errMsg += " clusterName=" +
ConnectContext.get().getCloudCluster();
+ + " < load required replica num " +
loadRequiredReplicaNum
+ + ", alive backends: [" +
StringUtils.join(bePathsMap.keySet(), ",") + "]";
+ if (Config.isCloudMode()) {
+ errMsg += ", or you may not have permission to
access the current cluster";
+ if (ConnectContext.get() != null) {
+ errMsg += " clusterName=" +
ConnectContext.get().getCloudCluster();
+ }
}
throw new
UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsg);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
index c8915966725..6a5fe19fcc6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
@@ -98,6 +98,8 @@ public class AuditEvent {
public long peakMemoryBytes = -1;
@AuditField(value = "SqlDigest")
public String sqlDigest = "";
+ @AuditField(value = "cloudClusterName")
+ public String cloudClusterName = "";
@AuditField(value = "TraceId")
public String traceId = "";
@AuditField(value = "WorkloadGroup")
@@ -149,6 +151,11 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setCloudCluster(String cloudClusterName) {
+ auditEvent.cloudClusterName = cloudClusterName;
+ return this;
+ }
+
public AuditEventBuilder setState(String state) {
auditEvent.state = state;
return this;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 9311b4ca8e8..fa8b1bdaf3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -32,6 +32,7 @@ import org.apache.doris.plugin.audit.AuditEvent.EventType;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.FrontendOptions;
+import com.google.common.base.Strings;
import org.apache.commons.codec.digest.DigestUtils;
public class AuditLogHelper {
@@ -44,6 +45,8 @@ public class AuditLogHelper {
long elapseMs = endTime - ctx.getStartTime();
CatalogIf catalog = ctx.getCurrentCatalog();
+ String cluster = Config.isCloudMode() ? ctx.getCloudCluster(false) :
"";
+
AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
auditEventBuilder.reset();
auditEventBuilder
@@ -66,6 +69,7 @@ public class AuditLogHelper {
.setReturnRows(ctx.getReturnRows())
.setStmtId(ctx.getStmtId())
.setQueryId(ctx.queryId() == null ? "NaN" :
DebugUtil.printId(ctx.queryId()))
+ .setCloudCluster(Strings.isNullOrEmpty(cluster) ? "UNKNOWN" :
cluster)
.setWorkloadGroup(ctx.getWorkloadGroupName())
.setFuzzyVariables(!printFuzzyVariables ? "" :
ctx.getSessionVariable().printFuzzyVariables());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 7e0bd962cf4..4ef75173786 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -1066,6 +1066,10 @@ public class ConnectContext {
this.cloudCluster = cluster;
}
+ public String getCloudCluster() {
+ return getCloudCluster(true);
+ }
+
/**
* @return Returns an available cluster in the following order
* 1 Use an explicitly specified cluster
@@ -1073,7 +1077,11 @@ public class ConnectContext {
* 3 If the user does not have a default cluster, select a cluster
with permissions for the user
* Returns null when there is no available cluster
*/
- public String getCloudCluster() {
+ public String getCloudCluster(boolean updateErr) {
+ if (!Config.isCloudMode()) {
+ return null;
+ }
+
String cluster = null;
if (!Strings.isNullOrEmpty(this.cloudCluster)) {
cluster = this.cloudCluster;
@@ -1091,11 +1099,13 @@ public class ConnectContext {
if (Strings.isNullOrEmpty(cluster)) {
LOG.warn("cant get a valid cluster for user {} to use",
getCurrentUserIdentity());
- getState().setError(ErrorCode.ERR_NO_CLUSTER_ERROR,
- "Cant get a Valid cluster for you to use, plz connect
admin");
+ if (updateErr) {
+ getState().setError(ErrorCode.ERR_NO_CLUSTER_ERROR,
+ "Cant get a Valid cluster for you to use, plz connect
admin");
+ }
} else {
this.cloudCluster = cluster;
- LOG.info("finally set context cluster name {}", cloudCluster);
+ LOG.info("finally set context cluster name {} for user {}",
cloudCluster, getCurrentUserIdentity());
}
return cluster;
@@ -1103,6 +1113,12 @@ public class ConnectContext {
// TODO implement this function
public String getDefaultCloudCluster() {
+ List<String> cloudClusterNames = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterNames();
+ String defaultCluster =
Env.getCurrentEnv().getAuth().getDefaultCloudCluster(getQualifiedUser());
+ if (!Strings.isNullOrEmpty(defaultCluster) &&
cloudClusterNames.contains(defaultCluster)) {
+ return defaultCluster;
+ }
+
return null;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7a213e47862..b1d591bbdd4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -508,7 +508,7 @@ public class Coordinator implements CoordInterface {
}
// Initialize
- protected void prepare() throws Exception {
+ protected void prepare() throws UserException {
for (PlanFragment fragment : fragments) {
fragmentExecParamsMap.put(fragment.getFragmentId(), new
FragmentExecParams(fragment));
}
@@ -524,7 +524,8 @@ public class Coordinator implements CoordInterface {
coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
- this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
+ this.idToBackend =
Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster();
+
if (LOG.isDebugEnabled()) {
int backendNum = idToBackend.size();
StringBuilder backendInfos = new StringBuilder("backends info:");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 40c126b732d..f0cd170d267 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.thrift.FrontendService;
@@ -28,6 +29,7 @@ import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -159,6 +161,10 @@ public class MasterOpExecutor {
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
+ if (Config.isCloudMode() &&
!Strings.isNullOrEmpty(ctx.getCloudCluster())) {
+ params.setCloudCluster(ctx.getCloudCluster());
+ }
+
// query options
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
// session variables
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 0304000a1f0..8bfd0d7f1a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -254,6 +254,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String CPU_RESOURCE_LIMIT = "cpu_resource_limit";
+ public static final String CLOUD_ENABLE_MULTI_CLUSTER_SYNC_LOAD =
"enable_multi_cluster_sync_load";
+
public static final String ENABLE_PARALLEL_OUTFILE =
"enable_parallel_outfile";
public static final String SQL_QUOTE_SHOW_CREATE = "sql_quote_show_create";
@@ -773,6 +775,10 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = FRAGMENT_TRANSMISSION_COMPRESSION_CODEC)
public String fragmentTransmissionCompressionCodec = "none";
+ // whether sync load to other cluster
+ @VariableMgr.VarAttr(name = CLOUD_ENABLE_MULTI_CLUSTER_SYNC_LOAD,
needForward = true)
+ public static boolean cloudEnableMultiClusterSyncLoad = false;
+
/*
* the parallel exec instance num for one Fragment in one BE
* 1 means disable this feature
@@ -1969,6 +1975,14 @@ public class SessionVariable implements Serializable,
Writable {
return autoCommit;
}
+ public boolean enableMultiClusterSyncLoad() {
+ return cloudEnableMultiClusterSyncLoad;
+ }
+
+ public void setEnableMultiClusterSyncLoad(boolean
cloudEnableMultiClusterSyncLoad) {
+ this.cloudEnableMultiClusterSyncLoad = cloudEnableMultiClusterSyncLoad;
+ }
+
public boolean isTxReadonly() {
return txReadonly;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 6008dc32c41..53ff62edb68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -744,7 +744,12 @@ public class ShowExecutor {
}
// Show clusters
- private void handleShowCluster() {
+ private void handleShowCluster() throws AnalysisException {
+ if (!Config.isCloudMode()) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_CLOUD_MODE);
+ return;
+ }
+
final ShowClusterStmt showStmt = (ShowClusterStmt) stmt;
final List<List<String>> rows = Lists.newArrayList();
List<String> clusterNames = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index d88663ad6fc..72073e1e847 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -53,6 +53,7 @@ import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.ReplaceTableClause;
+import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SetOperationStmt;
import org.apache.doris.analysis.SetStmt;
@@ -90,8 +91,11 @@ import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.cloud.analysis.UseCloudClusterStmt;
import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.proto.Cloud.ClusterStatus;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuditLog;
+import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@@ -165,17 +169,22 @@ import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
+import org.apache.doris.thrift.BackendService.Client;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
import org.apache.doris.thrift.TLoadTxnBeginResult;
import org.apache.doris.thrift.TMergeType;
+import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TSyncLoadForTabletsRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TabletCommitInfo;
@@ -210,6 +219,8 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
// Do one COM_QUERY process.
@@ -221,6 +232,7 @@ public class StmtExecutor {
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
public static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
public static final String NULL_VALUE_FOR_LOAD = "\\N";
+ private Pattern beIpPattern = Pattern.compile("\\[(\\d+):");
private final Object writeProfileLock = new Object();
private ConnectContext context;
private final StatementContext statementContext;
@@ -476,7 +488,34 @@ public class StmtExecutor {
public void execute() throws Exception {
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
- execute(queryId);
+ TUniqueId firstQueryId = queryId;
+ int retryTime = Config.max_query_retry_time;
+ for (int i = 1; i <= retryTime; i++) {
+ try {
+ execute(queryId);
+ return;
+ } catch (UserException e) {
+ if (!e.getMessage().contains("E-230") || i == retryTime) {
+ throw e;
+ }
+ TUniqueId lastQueryId = queryId;
+ uuid = UUID.randomUUID();
+ queryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
+ int randomMillis = 10 + (int) (Math.random() * 10);
+ if (i > retryTime / 2) {
+ randomMillis = 20 + (int) (Math.random() * 10);
+ }
+ if (DebugPointUtil.isEnable("StmtExecutor.retry.longtime")) {
+ randomMillis = 1000;
+ }
+ LOG.warn("receive E-230 tried={} first queryId={} last
queryId={} new queryId={} sleep={}ms",
+ i, DebugUtil.printId(firstQueryId),
DebugUtil.printId(lastQueryId),
+ DebugUtil.printId(queryId), randomMillis);
+ Thread.sleep(randomMillis);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
}
public boolean notAllowFallback() {
@@ -718,14 +757,62 @@ public class StmtExecutor {
AuditLog.getQueryAudit().log("Query {} {} times with new
query id: {}",
DebugUtil.printId(queryId), i,
DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
+ if (Config.isCloudMode()) {
+ // sleep random millis [1000, 1500] ms
+ // in the begining of retryTime/2
+ int randomMillis = 1000 + (int) (Math.random() * (1000
- 500));
+ LOG.debug("stmt executor retry times {}, wait
randomMillis:{}, stmt:{}",
+ i, randomMillis, originStmt.originStmt);
+ try {
+ if (i > retryTime / 2) {
+ // sleep random millis [2000, 2500] ms
+ // in the ending of retryTime/2
+ randomMillis = 2000 + (int) (Math.random() *
(1000 - 500));
+ }
+ Thread.sleep(randomMillis);
+ } catch (InterruptedException e) {
+ LOG.info("stmt executor sleep wait
InterruptedException: ", e);
+ }
+ }
}
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(false);
}
handleQueryStmt();
break;
- } catch (RpcException e) {
- if (i == retryTime - 1) {
+ } catch (RpcException | UserException e) {
+ // cloud mode retry
+ LOG.debug("due to exception {} retry {} rpc {} user {}",
+ e.getMessage(), i, e instanceof RpcException, e
instanceof UserException);
+ // errCode = 2, detailMessage = There is no scanNode Backend
available.[10003: not alive]
+ List<String> bes =
Env.getCurrentSystemInfo().getAllBackendIds().stream()
+ .map(id ->
Long.toString(id)).collect(Collectors.toList());
+ String msg = e.getMessage();
+ boolean isNeedRetry = true;
+ if (e instanceof UserException
+ &&
msg.contains(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG)) {
+ isNeedRetry = false;
+ Matcher matcher = beIpPattern.matcher(msg);
+ // here retry planner not be recreated, so
+ // in cloud mode drop node, be id invalid, so need not
retry
+ // such as be ids [11000, 11001] -> after drop node 11001
+ // don't need to retry 11001's request
+ if (matcher.find()) {
+ String notAliveBe = matcher.group(1);
+ isNeedRetry = bes.contains(notAliveBe);
+ if (isNeedRetry) {
+ Backend abnormalBe =
Env.getCurrentSystemInfo().getBackend(Long.parseLong(notAliveBe));
+ String deadCloudClusterStatus =
abnormalBe.getCloudClusterStatus();
+ String deadCloudClusterClusterName =
abnormalBe.getCloudClusterName();
+ LOG.info("need retry cluster {} status {}-{}",
deadCloudClusterClusterName,
+ deadCloudClusterStatus,
ClusterStatus.valueOf(deadCloudClusterStatus));
+ if (ClusterStatus.valueOf(deadCloudClusterStatus)
!= ClusterStatus.NORMAL) {
+
CloudSystemInfoService.waitForAutoStart(deadCloudClusterClusterName);
+ }
+ }
+ }
+ }
+ if (i == retryTime - 1 || !isNeedRetry) {
throw e;
}
if (context.getConnectType().equals(ConnectType.MYSQL) &&
!context.getMysqlChannel().isSend()) {
@@ -1008,6 +1095,14 @@ public class StmtExecutor {
}
}
+ private boolean hasCloudClusterPriv() {
+ if (ConnectContext.get() == null ||
Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster())) {
+ return false;
+ }
+ return
Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
+ ConnectContext.get().getCloudCluster(), PrivPredicate.USAGE,
ResourceTypeEnum.CLUSTER);
+ }
+
// Analyze one statement to structure in memory.
public void analyze(TQueryOptions tQueryOptions) throws UserException,
InterruptedException {
if (LOG.isDebugEnabled()) {
@@ -1136,6 +1231,27 @@ public class StmtExecutor {
resetAnalyzerAndStmt();
}
} catch (UserException e) {
+ // cloud mode retry, when retry need check this user has
cloud cluster auth.
+ // if user doesn't have cloud cluster auth, don't retry,
just return.
+ if (Config.isCloudMode()
+ &&
(e.getMessage().contains(SystemInfoService.NOT_USING_VALID_CLUSTER_MSG)
+ || e.getMessage().contains("backend -1"))
+ && hasCloudClusterPriv()) {
+ LOG.debug("cloud mode analyzeAndGenerateQueryPlan
retry times {}", i);
+ // sleep random millis [500, 1000] ms
+ int randomMillis = 500 + (int) (Math.random() * (1000
- 500));
+ try {
+ if (i > analyzeTimes / 2) {
+ randomMillis = 1000 + (int) (Math.random() *
(1000 - 500));
+ }
+ Thread.sleep(randomMillis);
+ } catch (InterruptedException ie) {
+ LOG.info("stmt executor sleep wait
InterruptedException: ", ie);
+ }
+ if (i < analyzeTimes) {
+ continue;
+ }
+ }
throw e;
} catch (Exception e) {
LOG.warn("Analyze failed. {}",
context.getQueryIdentifier(), e);
@@ -2101,6 +2217,23 @@ public class StmtExecutor {
txnStatus = TransactionStatus.COMMITTED;
}
+ // TODO(meiyi)
+ // insertStmt.afterFinishTxn(true);
+ if (Config.isCloudMode()) {
+ String clusterName = context.getCloudCluster();
+ if
(context.getSessionVariable().enableMultiClusterSyncLoad()
+ && clusterName != null && !clusterName.isEmpty()) {
+ CloudSystemInfoService infoService =
(CloudSystemInfoService) Env.getCurrentSystemInfo();
+ List<List<Backend>> backendsList = infoService
+
.getCloudClusterNames()
+ .stream()
+ .filter(name
-> !name.equals(clusterName))
+ .map(name ->
infoService.getBackendsByClusterName(name))
+
.collect(Collectors.toList());
+ List<Long> allTabletIds = ((OlapTable)
insertStmt.getTargetTable()).getAllTabletIds();
+ syncLoadForTablets(backendsList, allTabletIds);
+ }
+ }
} catch (Throwable t) {
// if any throwable being thrown during insert operation,
first we should abort this txn
LOG.warn("handle insert stmt fail: {}", label, t);
@@ -2173,6 +2306,41 @@ public class StmtExecutor {
context.updateReturnRows((int) loadedRows);
}
+ public static void syncLoadForTablets(List<List<Backend>> backendsList,
List<Long> allTabletIds) {
+ backendsList.forEach(backends -> backends.forEach(backend -> {
+ if (backend.isAlive()) {
+ List<Long> tabletIdList = new ArrayList<Long>();
+ Set<Long> beTabletIds = null;
+ // TODO(merge-cloud): need implements cloud rebalancer,
otherwise raise beTabletIds NPE
+ //Set<Long> beTabletIds = Env.getCurrentEnv()
+ // .getCloudTabletRebalancer()
+ //
.getSnapshotTabletsByBeId(backend.getId());
+ allTabletIds.forEach(tabletId -> {
+ if (beTabletIds.contains(tabletId)) {
+ tabletIdList.add(tabletId);
+ }
+ });
+ boolean ok = false;
+ TNetworkAddress address = null;
+ Client client = null;
+ try {
+ address = new TNetworkAddress(backend.getHost(),
backend.getBePort());
+ client = ClientPool.backendPool.borrowObject(address);
+ client.syncLoadForTablets(new
TSyncLoadForTabletsRequest(allTabletIds));
+ ok = true;
+ } catch (Exception e) {
+ LOG.warn(e.getMessage());
+ } finally {
+ if (!ok) {
+ ClientPool.backendPool.invalidateObject(address,
client);
+ } else {
+ ClientPool.backendPool.returnObject(address, client);
+ }
+ }
+ }
+ }));
+ }
+
private void handleExternalInsertStmt() {
// TODO(tsy): load refactor, handle external load here
try {
@@ -2257,6 +2425,11 @@ public class StmtExecutor {
}
private void handleUseCloudClusterStmt() throws AnalysisException {
+ if (!Config.isCloudMode()) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_CLOUD_MODE);
+ return;
+ }
+
UseCloudClusterStmt useCloudClusterStmt = (UseCloudClusterStmt)
parsedStmt;
try {
((CloudEnv)
context.getEnv()).changeCloudCluster(useCloudClusterStmt.getCluster(), context);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 36d24ddfc4d..d2ce1a7194a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -44,6 +44,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.planner.CloudStreamLoadPlanner;
import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
@@ -988,6 +989,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
ConnectContext context = new ConnectContext(null, true);
// Set current connected FE to the client address, so that we can know
where this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());
+ if (Config.isCloudMode() &&
!Strings.isNullOrEmpty(params.getCloudCluster())) {
+ context.setCloudCluster(params.getCloudCluster());
+ }
ConnectProcessor processor = null;
if (context.getConnectType().equals(ConnectType.MYSQL)) {
@@ -1941,6 +1945,41 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
List<String> tableNames = request.getTableNames();
+
+ if (Config.isCloudMode()) {
+ try {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
+ ctx.setQualifiedUser(request.getUser());
+ ctx.setRemoteIP(request.getUserIp());
+ String userName =
ClusterNamespace.getNameFromFullName(request.getUser());
+ if (userName != null) {
+ List<UserIdentity> currentUser = Lists.newArrayList();
+ try {
+
Env.getCurrentEnv().getAuth().checkPlainPassword(userName,
+ request.getUserIp(), request.getPasswd(),
currentUser);
+ } catch (AuthenticationException e) {
+ throw new UserException(e.formatErrMsg());
+ }
+ Preconditions.checkState(currentUser.size() == 1);
+ ctx.setCurrentUserIdentity(currentUser.get(0));
+ }
+ LOG.info("one stream multi table load use cloud cluster {}",
request.getCloudCluster());
+ //ctx.setCloudCluster();
+ if (!Strings.isNullOrEmpty(request.getCloudCluster())) {
+ if (Strings.isNullOrEmpty(request.getUser())) {
+ ctx.setCloudCluster(request.getCloudCluster());
+ } else {
+ ((CloudEnv)
Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx);
+ }
+ }
+ } catch (UserException e) {
+ LOG.warn("failed to set ConnectContext info: {}",
e.getMessage());
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(e.getMessage());
+ }
+ }
+
try {
if (CollectionUtils.isEmpty(tableNames)) {
throw new MetaNotFoundException("table not found");
@@ -2101,6 +2140,49 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest
request, TStreamLoadPutResult result)
throws UserException {
+ if (request.isSetAuthCode()) {
+ String clientAddr = getClientAddrAsString();
+ ConnectContext ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
+ ctx.setRemoteIP(clientAddr);
+ long backendId = request.getBackendId();
+ Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+ Preconditions.checkNotNull(backend);
+ ctx.setCloudCluster(backend.getCloudClusterName());
+ LOG.info("streamLoadPutImpl set context: cluster {}",
ctx.getCloudCluster());
+ } else if (Config.isCloudMode()) {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
+ ctx.setQualifiedUser(request.getUser());
+ ctx.setRemoteIP(request.getUserIp());
+ String userName =
ClusterNamespace.getNameFromFullName(request.getUser());
+ if (userName != null) {
+ List<UserIdentity> currentUser = Lists.newArrayList();
+ try {
+ Env.getCurrentEnv().getAuth().checkPlainPassword(userName,
+ request.getUserIp(), request.getPasswd(),
currentUser);
+ } catch (AuthenticationException e) {
+ throw new UserException(e.formatErrMsg());
+ }
+ Preconditions.checkState(currentUser.size() == 1);
+ ctx.setCurrentUserIdentity(currentUser.get(0));
+ }
+
+ LOG.info("stream load use cloud cluster {}",
request.getCloudCluster());
+ if (!Strings.isNullOrEmpty(request.getCloudCluster())) {
+ if (Strings.isNullOrEmpty(request.getUser())) {
+ // mysql load
+ ctx.setCloudCluster(request.getCloudCluster());
+ } else {
+ // stream load
+ ((CloudEnv)
Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx);
+ }
+ }
+
+ LOG.debug("streamLoadPutImpl set context: cluster {},
setCurrentUserIdentity {}",
+ ctx.getCloudCluster(), ctx.getCurrentUserIdentity());
+ }
+
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index d8822633937..6e6268020c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -277,6 +277,18 @@ public class Backend implements Writable {
this.backendStatus.isLoadDisabled = isLoadDisabled;
}
+ public void setActive(boolean isActive) {
+ this.backendStatus.isActive = isActive;
+ }
+
+ public boolean isActive() {
+ return this.backendStatus.isActive;
+ }
+
+ public long getCurrentFragmentNum() {
+ return this.backendStatus.currentFragmentNum;
+ }
+
// for test only
public void updateOnce(int bePort, int httpPort, int beRpcPort) {
if (this.bePort != bePort) {
@@ -779,6 +791,10 @@ public class Backend implements Writable {
this.lastStartTime = hbResponse.getBeStartTime();
isChanged = true;
}
+
+ this.backendStatus.currentFragmentNum =
hbResponse.getFragmentNum();
+ this.backendStatus.lastFragmentUpdateTime =
hbResponse.getLastFragmentUpdateTime();
+
heartbeatErrMsg = "";
this.heartbeatFailureCounter = 0;
} else {
@@ -834,6 +850,12 @@ public class Backend implements Writable {
public volatile boolean isQueryDisabled = false;
@SerializedName("isLoadDisabled")
public volatile boolean isLoadDisabled = false;
+ @SerializedName("isActive")
+ public volatile boolean isActive = true;
+
+ // cloud mode, cloud control just query master, so not need
SerializedName
+ public volatile long currentFragmentNum = 0;
+ public volatile long lastFragmentUpdateTime = 0;
@Override
public String toString() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
index 0b347f0cbb2..e009ab4abf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
@@ -48,6 +48,8 @@ public class BackendHbResponse extends HeartbeatResponse
implements Writable {
private long beStartTime = 0;
private String host;
private String version = "";
+ private long fragmentNum;
+ private long lastFragmentUpdateTime;
@SerializedName(value = "isShutDown")
private boolean isShutDown = false;
@@ -56,7 +58,8 @@ public class BackendHbResponse extends HeartbeatResponse
implements Writable {
}
public BackendHbResponse(long beId, int bePort, int httpPort, int
brpcPort, long hbTime, long beStartTime,
- String version, String nodeRole, boolean isShutDown, int
arrowFlightSqlPort) {
+ String version, String nodeRole, long fragmentNum, long
lastFragmentUpdateTime,
+ boolean isShutDown, int arrowFlightSqlPort) {
super(HeartbeatResponse.Type.BACKEND);
this.beId = beId;
this.status = HbStatus.OK;
@@ -67,6 +70,8 @@ public class BackendHbResponse extends HeartbeatResponse
implements Writable {
this.beStartTime = beStartTime;
this.version = version;
this.nodeRole = nodeRole;
+ this.fragmentNum = fragmentNum;
+ this.lastFragmentUpdateTime = lastFragmentUpdateTime;
this.isShutDown = isShutDown;
this.arrowFlightSqlPort = arrowFlightSqlPort;
}
@@ -86,6 +91,14 @@ public class BackendHbResponse extends HeartbeatResponse
implements Writable {
this.msg = errMsg;
}
+ public long getFragmentNum() {
+ return fragmentNum;
+ }
+
+ public long getLastFragmentUpdateTime() {
+ return lastFragmentUpdateTime;
+ }
+
public long getBeId() {
return beId;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
index 2c766221acb..dfb1ffeaae0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -20,12 +20,12 @@ package org.apache.doris.system;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;
-import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -172,7 +172,7 @@ public class BeSelectionPolicy {
return true;
}
- public List<Backend> getCandidateBackends(ImmutableCollection<Backend>
backends) {
+ public List<Backend> getCandidateBackends(Collection<Backend> backends) {
List<Backend> filterBackends =
backends.stream().filter(this::isMatch).collect(Collectors.toList());
List<Backend> preLocationFilterBackends = filterBackends.stream()
.filter(iterm ->
preferredLocations.contains(iterm.getHost())).collect(Collectors.toList());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 5d17846476f..e09c7f2b991 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -268,12 +268,17 @@ public class HeartbeatMgr extends MasterDaemon {
if (tBackendInfo.isSetBeNodeRole()) {
nodeRole = tBackendInfo.getBeNodeRole();
}
+
+ long fragmentNum =
tBackendInfo.getFragmentExecutingCount();
+ long lastFragmentUpdateTime =
tBackendInfo.getFragmentLastActiveTime();
+
boolean isShutDown = false;
if (tBackendInfo.isSetIsShutdown()) {
isShutDown = tBackendInfo.isIsShutdown();
}
return new BackendHbResponse(backendId, bePort, httpPort,
brpcPort,
- System.currentTimeMillis(), beStartTime, version,
nodeRole, isShutDown, arrowFlightSqlPort);
+ System.currentTimeMillis(), beStartTime, version,
nodeRole,
+ fragmentNum, lastFragmentUpdateTime, isShutDown,
arrowFlightSqlPort);
} else {
return new BackendHbResponse(backendId, backend.getHost(),
result.getStatus().getErrorMsgs().isEmpty()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index d31e786c384..891a6dc36fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -993,12 +993,35 @@ public class SystemInfoService {
return bes.stream().filter(b ->
b.getLocationTag().equals(tag)).collect(Collectors.toList());
}
+ // CloudSystemInfoService override
+ public List<Backend> getBackendsByCurrentCluster() throws UserException {
+ return idToBackendRef.values().stream().collect(Collectors.toList());
+ }
+
+ // CloudSystemInfoService override
+ public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster()
throws UserException {
+ return getIdToBackend();
+ }
+
public int getMinPipelineExecutorSize() {
- if (idToBackendRef.size() == 0) {
+ if (Config.isCloudMode() && ConnectContext.get() != null
+ &&
Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster(false))) {
+ return 1;
+ }
+ List<Backend> currentBackends = null;
+ try {
+ currentBackends = getBackendsByCurrentCluster();
+ } catch (UserException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get current cluster backends failed: ", e);
+ }
+ return 1;
+ }
+ if (currentBackends.size() == 0) {
return 1;
}
int minPipelineExecutorSize = Integer.MAX_VALUE;
- for (Backend be : idToBackendRef.values()) {
+ for (Backend be : currentBackends) {
int size = be.getPipelineExecutorSize();
if (size > 0) {
minPipelineExecutorSize = Math.min(minPipelineExecutorSize,
size);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 156dae72234..07e5b725aff 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -217,7 +217,7 @@ public class RoutineLoadManagerTest {
try {
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db",
"table");
Assert.fail();
- } catch (DdlException e) {
+ } catch (UserException e) {
LOG.info(e.getMessage());
}
}
@@ -225,7 +225,7 @@ public class RoutineLoadManagerTest {
@Test
public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext
connectContext,
@Mocked Env env,
- @Mocked EditLog editLog)
throws DdlException {
+ @Mocked EditLog editLog)
throws UserException {
String jobName = "job1";
String topicName = "topic1";
String serverAddress = "http://127.0.0.1:8080";
@@ -761,7 +761,7 @@ public class RoutineLoadManagerTest {
@Test
public void testCheckBeToTask(@Mocked Env env,
- @Mocked SystemInfoService systemInfoService)
throws LoadException, DdlException {
+ @Mocked SystemInfoService systemInfoService)
throws UserException {
List<Long> beIdsInCluster = Lists.newArrayList();
beIdsInCluster.add(1L);
Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
index f5994f89a89..0391c7c602c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -21,10 +21,10 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.RoutineLoadDesc;
@@ -128,7 +128,7 @@ public class RoutineLoadSchedulerTest {
public void functionTest(@Mocked Env env, @Mocked InternalCatalog catalog,
@Mocked SystemInfoService systemInfoService, @Injectable Database
database)
- throws DdlException, InterruptedException {
+ throws UserException, InterruptedException {
new Expectations() {
{
minTimes = 0;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index e933c0df17c..0ac72df4a6d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -98,7 +98,7 @@ public class SystemInfoServiceTest {
System.out.println(Env.getCurrentEnvJournalVersion());
BackendHbResponse writeResponse = new BackendHbResponse(1L, 1234,
1234, 1234, 1234, 1234, "test",
- Tag.VALUE_COMPUTATION, false, 1234);
+ Tag.VALUE_COMPUTATION, 10, 100, false, 1234);
// Write objects to file
File file1 = new File("./BackendHbResponseSerialization");
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
index f359d6e6a9e..c9e3c50d998 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
@@ -201,7 +201,8 @@ public class DemoMultiBackendsTest {
Assert.assertEquals("{\"location\" : \"default\"}",
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size()
- 6));
Assert.assertEquals(
-
"{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
+
"{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,"
+ +
"\"isLoadDisabled\":false,\"isActive\":true,\"currentFragmentNum\":0,\"lastFragmentUpdateTime\":0}",
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size()
- 3));
Assert.assertEquals("0",
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 2));
Assert.assertEquals(Tag.VALUE_MIX,
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 1));
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 376e2a34df9..127852ae6b4 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -68,6 +68,8 @@ struct TRoutineLoadTask {
15: optional PaloInternalService.TPipelineFragmentParams pipeline_params
16: optional bool is_multi_table
17: optional bool memtable_on_sink_node;
+ 18: optional string qualified_user
+ 19: optional string cloud_cluster
}
struct TKafkaMetaProxyRequest {
diff --git
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index 07aced68f60..3a26e153be5 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -18,5 +18,18 @@
testGroups = "p0"
//exclude groups and exclude suites is more prior than include groups and
include suites.
excludeSuites =
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_stream_load_new_move_memtable,test_stream_load_move_memtable,test_materialized_view_move_memtable,test_disable_move_memtable,test_insert_move_memtable,set_and_unset_variable,test_pk_uk_case_cluster,test_point_query_cluster_key,test_compaction_uniq_cluster_keys_with_delete,test_compaction_uniq_keys_cluster_key,test_set_partit
[...]
-excludeDirectories =
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,unique_with_mow_p0/ssb_unique_load_zstd_c,nereids_rules_p0/mv,backup_restore,cold_heat_separation,storage_medium_p0"
+
+excludeDirectories = """
+ cloud/multi_cluster,
+ workload_manager_p1,
+ nereids_rules_p0/subquery,
+ unique_with_mow_p0/cluster_key,
+ unique_with_mow_p0/ssb_unique_sql_zstd_cluster,
+ unique_with_mow_p0/ssb_unique_load_zstd_c,
+ nereids_rules_p0/mv,
+ backup_restore,
+ cold_heat_separation,
+ storage_medium_p0
+"""
+
max_failure_num = 200
diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy
b/regression-test/pipeline/p0/conf/regression-conf.groovy
index b99e21c4e7c..d8de4fd5059 100644
--- a/regression-test/pipeline/p0/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p0/conf/regression-conf.groovy
@@ -59,7 +59,11 @@ excludeGroups = ""
excludeSuites =
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external"
// this directories will not be executed
-excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery"
+excludeDirectories = """
+ cloud,
+ nereids_rules_p0/subquery,
+ workload_manager_p1
+"""
customConf1 = "test_custom_conf_value"
diff --git a/regression-test/plugins/plugin_cluster.groovy
b/regression-test/plugins/plugin_cluster.groovy
new file mode 100644
index 00000000000..15237e29aa8
--- /dev/null
+++ b/regression-test/plugins/plugin_cluster.groovy
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonOutput
+import org.apache.doris.regression.suite.Suite
+
+Suite.metaClass.add_cluster = { be_unique_id, ip, port, cluster_name,
cluster_id ->
+ def jsonOutput = new JsonOutput()
+ def s3 = [
+ type: 'COMPUTE',
+ cluster_name : cluster_name,
+ cluster_id : cluster_id,
+ nodes: [
+ [
+ cloud_unique_id: be_unique_id,
+ ip: ip,
+ heartbeat_port: port
+ ],
+ ]
+ ]
+ def map = [instance_id: "${instance_id}", cluster: s3]
+ def js = jsonOutput.toJson(map)
+ log.info("add cluster req: ${js} ".toString())
+
+ def add_cluster_api = { request_body, check_func ->
+ httpTest {
+ endpoint context.config.metaServiceHttpAddress
+ uri "/MetaService/http/add_cluster?token=${token}"
+ body request_body
+ check check_func
+ }
+ }
+
+ add_cluster_api.call(js) {
+ respCode, body ->
+ log.info("add cluster resp: ${body} ${respCode}".toString())
+ def json = parseJson(body)
+ assertTrue(json.code.equalsIgnoreCase('OK') ||
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+ }
+}
+
+Suite.metaClass.get_cluster = { be_unique_id ->
+ def jsonOutput = new JsonOutput()
+ def map = [instance_id: "${instance_id}", cloud_unique_id:
"${be_unique_id}" ]
+ def js = jsonOutput.toJson(map)
+ log.info("get cluster req: ${js} ".toString())
+
+ def add_cluster_api = { request_body, check_func ->
+ httpTest {
+ endpoint context.config.metaServiceHttpAddress
+ uri "/MetaService/http/get_cluster?token=${token}"
+ body request_body
+ check check_func
+ }
+ }
+
+ def json
+ add_cluster_api.call(js) {
+ respCode, body ->
+ log.info("get cluster resp: ${body} ${respCode}".toString())
+ json = parseJson(body)
+ assertTrue(json.code.equalsIgnoreCase('OK') ||
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+ }
+ json.result.cluster
+}
+
+Suite.metaClass.drop_cluster = { cluster_name, cluster_id ->
+ def jsonOutput = new JsonOutput()
+ def reqBody = [
+ type: 'COMPUTE',
+ cluster_name : cluster_name,
+ cluster_id : cluster_id,
+ nodes: [
+ ]
+ ]
+ def map = [instance_id: "${instance_id}", cluster: reqBody]
+ def js = jsonOutput.toJson(map)
+ log.info("drop cluster req: ${js} ".toString())
+
+ def drop_cluster_api = { request_body, check_func ->
+ httpTest {
+ endpoint context.config.metaServiceHttpAddress
+ uri "/MetaService/http/drop_cluster?token=${token}"
+ body request_body
+ check check_func
+ }
+ }
+
+ drop_cluster_api.call(js) {
+ respCode, body ->
+ log.info("dorp cluster resp: ${body} ${respCode}".toString())
+ def json = parseJson(body)
+ assertTrue(json.code.equalsIgnoreCase('OK') ||
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+ }
+}
+
+Suite.metaClass.add_node = { be_unique_id, ip, port, cluster_name, cluster_id
->
+ def jsonOutput = new JsonOutput()
+ def clusterInfo = [
+ type: 'COMPUTE',
+ cluster_name : cluster_name,
+ cluster_id : cluster_id,
+ nodes: [
+ [
+ cloud_unique_id: be_unique_id,
+ ip: ip,
+ heartbeat_port: port
+ ],
+ ]
+ ]
+ def map = [instance_id: "${instance_id}", cluster: clusterInfo]
+ def js = jsonOutput.toJson(map)
+ log.info("add node req: ${js} ".toString())
+
+ def add_cluster_api = { request_body, check_func ->
+ httpTest {
+ endpoint context.config.metaServiceHttpAddress
+ uri "/MetaService/http/add_node?token=${token}"
+ body request_body
+ check check_func
+ }
+ }
+
+ add_cluster_api.call(js) {
+ respCode, body ->
+ log.info("add node resp: ${body} ${respCode}".toString())
+ def json = parseJson(body)
+ assertTrue(json.code.equalsIgnoreCase('OK') ||
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+ }
+}
+
+Suite.metaClass.d_node = { be_unique_id, ip, port, cluster_name, cluster_id ->
+ def jsonOutput = new JsonOutput()
+ def clusterInfo = [
+ type: 'COMPUTE',
+ cluster_name : cluster_name,
+ cluster_id : cluster_id,
+ nodes: [
+ [
+ cloud_unique_id: be_unique_id,
+ ip: ip,
+ heartbeat_port: port
+ ],
+ ]
+ ]
+ def map = [instance_id: "${instance_id}", cluster: clusterInfo]
+ def js = jsonOutput.toJson(map)
+ log.info("decommission node req: ${js} ".toString())
+
+ def d_cluster_api = { request_body, check_func ->
+ httpTest {
+ endpoint context.config.metaServiceHttpAddress
+ uri "/MetaService/http/decommission_node?token=${token}"
+ body request_body
+ check check_func
+ }
+ }
+
+ d_cluster_api.call(js) {
+ respCode, body ->
+ log.info("decommission node resp: ${body} ${respCode}".toString())
+ def json = parseJson(body)
+ assertTrue(json.code.equalsIgnoreCase('OK') ||
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]