This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 8eb7e3b058a [improve](task) Support splitting agent batch tasks
automatically #42703 (#42987)
8eb7e3b058a is described below
commit 8eb7e3b058a3e82aae1b50289f29ac155bb3646b
Author: walter <[email protected]>
AuthorDate: Thu Oct 31 21:14:09 2024 +0800
[improve](task) Support splitting agent batch tasks automatically #42703
(#42987)
cherry pick from #42703
---
.../main/java/org/apache/doris/common/Config.java | 7 +++++
.../java/org/apache/doris/backup/BackupJob.java | 9 +++---
.../java/org/apache/doris/backup/RestoreJob.java | 14 +++++-----
.../java/org/apache/doris/task/AgentBatchTask.java | 32 ++++++++++++++++++----
4 files changed, 44 insertions(+), 18 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 509a266365c..8aedeb091f3 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2418,6 +2418,13 @@ public class Config extends ConfigBase {
})
public static int restore_download_task_num_per_be = 3;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
+ "The max number of batched tasks per RPC assigned to each be
during the backup/restore process, "
+ + "the default value is 10000."
+ })
+ public static int backup_restore_batch_task_num_per_rpc = 10000;
+
@ConfField(description = {"是否开启通过http接口获取log文件的功能",
"Whether to enable the function of getting log files through http
interface"})
public static boolean enable_get_log_file_api = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 9ee4f6d0603..4315ad8ee4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -209,11 +209,10 @@ public class BackupJob extends AbstractJob {
task.getIndexId(), task.getTabletId(),
task.getVersion(),
task.getSchemaHash(), timeoutMs, false /* not restore task
*/);
- AgentBatchTask batchTask = new AgentBatchTask();
- batchTask.addTask(newTask);
unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());
//send task
+ AgentBatchTask batchTask = new AgentBatchTask(newTask);
AgentTaskQueue.addTask(newTask);
AgentTaskExecutor.submit(batchTask);
@@ -447,7 +446,7 @@ public class BackupJob extends AbstractJob {
// copy all related schema at this moment
List<Table> copiedTables = Lists.newArrayList();
List<Resource> copiedResources = Lists.newArrayList();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (TableRef tableRef : tableRefs) {
String tblName = tableRef.getName().getTbl();
Table tbl = db.getTableNullable(tblName);
@@ -695,7 +694,7 @@ public class BackupJob extends AbstractJob {
beToSnapshots.put(info.getBeId(), info);
}
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> infos = beToSnapshots.get(beId);
int totalNum = infos.size();
@@ -851,7 +850,7 @@ public class BackupJob extends AbstractJob {
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null,
info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 173af1c25f3..fb96fdbc0bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -859,7 +859,7 @@ public class RestoreJob extends AbstractJob {
AgentBatchTask batchTask =
batchTaskPerTable.get(localTbl.getId());
if (batchTask == null) {
- batchTask = new AgentBatchTask();
+ batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTaskPerTable.put(localTbl.getId(), batchTask);
}
createReplicas(db, batchTask, localTbl, restorePart);
@@ -875,7 +875,7 @@ public class RestoreJob extends AbstractJob {
for (Partition restorePart :
restoreOlapTable.getPartitions()) {
AgentBatchTask batchTask =
batchTaskPerTable.get(restoreTbl.getId());
if (batchTask == null) {
- batchTask = new AgentBatchTask();
+ batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTaskPerTable.put(restoreTbl.getId(),
batchTask);
}
createReplicas(db, batchTask, restoreOlapTable,
restorePart, tabletBases);
@@ -1128,7 +1128,7 @@ public class RestoreJob extends AbstractJob {
taskProgress.clear();
taskErrMsg.clear();
Multimap<Long, Long> bePathsMap = HashMultimap.create();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
db.readLock();
try {
for (Map.Entry<IdChain, IdChain> entry :
fileMapping.getMapping().entrySet()) {
@@ -1592,7 +1592,7 @@ public class RestoreJob extends AbstractJob {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
@@ -1745,7 +1745,7 @@ public class RestoreJob extends AbstractJob {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
@@ -1922,7 +1922,7 @@ public class RestoreJob extends AbstractJob {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
// tablet id->(be id -> download info)
for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) {
SnapshotInfo info = cell.getValue();
@@ -2111,7 +2111,7 @@ public class RestoreJob extends AbstractJob {
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null,
info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 848211f9413..b1839400d31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -51,6 +51,7 @@ import org.apache.doris.thrift.TUploadReq;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
import java.util.HashMap;
import java.util.LinkedList;
@@ -63,6 +64,8 @@ import java.util.Map;
public class AgentBatchTask implements Runnable {
private static final Logger LOG =
LogManager.getLogger(AgentBatchTask.class);
+ private int batchSize = Integer.MAX_VALUE;
+
// backendId -> AgentTask List
private Map<Long, List<AgentTask>> backendIdToTasks;
@@ -70,6 +73,12 @@ public class AgentBatchTask implements Runnable {
this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
}
+ public AgentBatchTask(int batchSize) {
+ this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
+ this.batchSize = batchSize;
+ assert batchSize > 0;
+ }
+
public AgentBatchTask(AgentTask singleTask) {
this();
addTask(singleTask);
@@ -168,14 +177,12 @@ public class AgentBatchTask implements Runnable {
List<TAgentTaskRequest> agentTaskRequests = new
LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
agentTaskRequests.add(toAgentTaskRequest(task));
- }
- client.submitTasks(agentTaskRequests);
- if (LOG.isDebugEnabled()) {
- for (AgentTask task : tasks) {
- LOG.debug("send task: type[{}], backend[{}],
signature[{}]",
- task.getTaskType(), backendId,
task.getSignature());
+ if (agentTaskRequests.size() >= batchSize) {
+ submitTasks(backendId, client, agentTaskRequests);
+ agentTaskRequests.clear();
}
}
+ submitTasks(backendId, client, agentTaskRequests);
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backendId, e);
@@ -194,6 +201,19 @@ public class AgentBatchTask implements Runnable {
} // end for backend
}
+ private static void submitTasks(long backendId,
+ BackendService.Client client, List<TAgentTaskRequest>
agentTaskRequests) throws TException {
+ if (!agentTaskRequests.isEmpty()) {
+ client.submitTasks(agentTaskRequests);
+ }
+ if (LOG.isDebugEnabled()) {
+ for (TAgentTaskRequest req : agentTaskRequests) {
+ LOG.debug("send task: type[{}], backend[{}], signature[{}]",
+ req.getTaskType(), backendId, req.getSignature());
+ }
+ }
+ }
+
private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest();
tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]