This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 86bb03435e1 [fix](task) Abort creating replica task if sending RPC failed #42276 (#42961) 86bb03435e1 is described below commit 86bb03435e14885b7ff3ae4ce2cc5017b218f2fd Author: walter <w41te...@gmail.com> AuthorDate: Thu Oct 31 14:40:20 2024 +0800 [fix](task) Abort creating replica task if sending RPC failed #42276 (#42961) cherry pick from #42276 --- .../org/apache/doris/alter/SchemaChangeJobV2.java | 2 +- .../java/org/apache/doris/backup/RestoreJob.java | 11 ++++++++--- .../apache/doris/common/MarkedCountDownLatch.java | 14 ++++++++++++++ .../java/org/apache/doris/task/AgentBatchTask.java | 20 ++++++++++---------- .../main/java/org/apache/doris/task/AgentTask.java | 4 ++++ .../org/apache/doris/task/CreateReplicaTask.java | 17 +++++++++++++++++ 6 files changed, 54 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 5f8fe8660b5..bce1bcf4692 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -354,7 +354,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { ok = false; } - if (!ok) { + if (!ok || !countDownLatch.getStatus().ok()) { // create replicas failed. just cancel the job // clear tasks and show the failed replicas to user AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index c287ca78038..f2ce3859c4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -979,7 +979,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { ok = true; } - if (ok) { + if (ok && latch.getStatus().ok()) { if (LOG.isDebugEnabled()) { LOG.debug("finished to create all restored replicas. {}", this); } @@ -1043,8 +1043,13 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { .map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")") .collect(Collectors.toList()); String idStr = Joiner.on(", ").join(subList); - status = new Status(ErrCode.COMMON_ERROR, - "Failed to create replicas for restore. unfinished marks: " + idStr); + String reason = "TIMEDOUT"; + if (!latch.getStatus().ok()) { + reason = latch.getStatus().getErrorMsg(); + } + String errMsg = String.format( + "Failed to create replicas for restore: %s, unfinished marks: %s", reason, idStr); + status = new Status(ErrCode.COMMON_ERROR, errMsg); return; } LOG.info("finished to prepare meta. {}", this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java index 14641d501d7..e1431c4d729 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java @@ -47,6 +47,20 @@ public class MarkedCountDownLatch<K, V> extends CountDownLatch { return false; } + public synchronized boolean markedCountDownWithStatus(K key, V value, Status status) { + // update status first before countDown. + // so that the waiting thread will get the correct status. + if (st.ok()) { + st = status; + } + + if (marks.remove(key, value)) { + super.countDown(); + return true; + } + return false; + } + public synchronized List<Entry<K, V>> getLeftMarks() { return Lists.newArrayList(marks.entries()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index 142395d9d9e..bf73f9b83fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -157,9 +157,11 @@ public class AgentBatchTask implements Runnable { BackendService.Client client = null; TNetworkAddress address = null; boolean ok = false; + String errMsg = ""; try { Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); if (backend == null || !backend.isAlive()) { + errMsg = String.format("backend %d is not alive", backendId); continue; } List<AgentTask> tasks = this.backendIdToTasks.get(backendId); @@ -169,30 +171,28 @@ public class AgentBatchTask implements Runnable { client = ClientPool.backendPool.borrowObject(address); List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>(); for (AgentTask task : tasks) { - try { - agentTaskRequests.add(toAgentTaskRequest(task)); - } catch (Exception e) { - task.failed(); - throw e; - } + agentTaskRequests.add(toAgentTaskRequest(task)); } client.submitTasks(agentTaskRequests); if (LOG.isDebugEnabled()) { for (AgentTask task : tasks) { - if (LOG.isDebugEnabled()) { - LOG.debug("send task: type[{}], backend[{}], signature[{}]", - task.getTaskType(), backendId, task.getSignature()); - } + LOG.debug("send task: type[{}], backend[{}], signature[{}]", + task.getTaskType(), backendId, task.getSignature()); } } ok = true; } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backendId, e); + errMsg = String.format("task exec error: %s. backend[%d]", e.getMessage(), backendId); } finally { if (ok) { ClientPool.backendPool.returnObject(address, client); } else { ClientPool.backendPool.invalidateObject(address, client); + List<AgentTask> tasks = this.backendIdToTasks.get(backendId); + for (AgentTask task : tasks) { + task.failedWithMsg(errMsg); + } } } } // end for backend diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java index f878fc521f5..0ba998b3808 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java @@ -110,6 +110,10 @@ public abstract class AgentTask { ++this.failedTimes; } + public void failedWithMsg(String errMsg) { + failed(); + } + public int getFailedTimes() { return this.failedTimes; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 10baf65a838..c410f37e5c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -235,6 +235,23 @@ public class CreateReplicaTask extends AgentTask { } } + @Override + public void failedWithMsg(String errMsg) { + super.failedWithMsg(errMsg); + + // CreateReplicaTask will not trigger a retry in ReportTask. Therefore, it needs to + // be marked as failed here and all threads waiting for the result of + // CreateReplicaTask need to be awakened. + if (this.latch != null) { + Status s = new Status(TStatusCode.CANCELLED, errMsg); + latch.markedCountDownWithStatus(getBackendId(), getTabletId(), s); + if (LOG.isDebugEnabled()) { + LOG.debug("CreateReplicaTask failed with msg: {}, tablet: {}, backend: {}", + errMsg, getTabletId(), getBackendId()); + } + } + } + public void setLatch(MarkedCountDownLatch<Long, Long> latch) { this.latch = latch; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org