This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 86bb03435e1 [fix](task) Abort creating replica task if sending RPC 
failed #42276 (#42961)
86bb03435e1 is described below

commit 86bb03435e14885b7ff3ae4ce2cc5017b218f2fd
Author: walter <w41te...@gmail.com>
AuthorDate: Thu Oct 31 14:40:20 2024 +0800

    [fix](task) Abort creating replica task if sending RPC failed #42276 
(#42961)
    
    cherry pick from #42276
---
 .../org/apache/doris/alter/SchemaChangeJobV2.java    |  2 +-
 .../java/org/apache/doris/backup/RestoreJob.java     | 11 ++++++++---
 .../apache/doris/common/MarkedCountDownLatch.java    | 14 ++++++++++++++
 .../java/org/apache/doris/task/AgentBatchTask.java   | 20 ++++++++++----------
 .../main/java/org/apache/doris/task/AgentTask.java   |  4 ++++
 .../org/apache/doris/task/CreateReplicaTask.java     | 17 +++++++++++++++++
 6 files changed, 54 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 5f8fe8660b5..bce1bcf4692 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -354,7 +354,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                 ok = false;
             }
 
-            if (!ok) {
+            if (!ok || !countDownLatch.getStatus().ok()) {
                 // create replicas failed. just cancel the job
                 // clear tasks and show the failed replicas to user
                 AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index c287ca78038..f2ce3859c4d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -979,7 +979,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             ok = true;
         }
 
-        if (ok) {
+        if (ok && latch.getStatus().ok()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("finished to create all restored replicas. {}", 
this);
             }
@@ -1043,8 +1043,13 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                     .map(item -> "(backendId = " + item.getKey() + ", tabletId 
= "  + item.getValue() + ")")
                     .collect(Collectors.toList());
             String idStr = Joiner.on(", ").join(subList);
-            status = new Status(ErrCode.COMMON_ERROR,
-                    "Failed to create replicas for restore. unfinished marks: 
" + idStr);
+            String reason = "TIMEDOUT";
+            if (!latch.getStatus().ok()) {
+                reason = latch.getStatus().getErrorMsg();
+            }
+            String errMsg = String.format(
+                    "Failed to create replicas for restore: %s, unfinished 
marks: %s", reason, idStr);
+            status = new Status(ErrCode.COMMON_ERROR, errMsg);
             return;
         }
         LOG.info("finished to prepare meta. {}", this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
index 14641d501d7..e1431c4d729 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
@@ -47,6 +47,20 @@ public class MarkedCountDownLatch<K, V> extends 
CountDownLatch {
         return false;
     }
 
+    public synchronized boolean markedCountDownWithStatus(K key, V value, 
Status status) {
+        // update status first before countDown.
+        // so that the waiting thread will get the correct status.
+        if (st.ok()) {
+            st = status;
+        }
+
+        if (marks.remove(key, value)) {
+            super.countDown();
+            return true;
+        }
+        return false;
+    }
+
     public synchronized List<Entry<K, V>> getLeftMarks() {
         return Lists.newArrayList(marks.entries());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 142395d9d9e..bf73f9b83fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -157,9 +157,11 @@ public class AgentBatchTask implements Runnable {
             BackendService.Client client = null;
             TNetworkAddress address = null;
             boolean ok = false;
+            String errMsg = "";
             try {
                 Backend backend = 
Env.getCurrentSystemInfo().getBackend(backendId);
                 if (backend == null || !backend.isAlive()) {
+                    errMsg = String.format("backend %d is not alive", 
backendId);
                     continue;
                 }
                 List<AgentTask> tasks = this.backendIdToTasks.get(backendId);
@@ -169,30 +171,28 @@ public class AgentBatchTask implements Runnable {
                 client = ClientPool.backendPool.borrowObject(address);
                 List<TAgentTaskRequest> agentTaskRequests = new 
LinkedList<TAgentTaskRequest>();
                 for (AgentTask task : tasks) {
-                    try {
-                        agentTaskRequests.add(toAgentTaskRequest(task));
-                    } catch (Exception e) {
-                        task.failed();
-                        throw e;
-                    }
+                    agentTaskRequests.add(toAgentTaskRequest(task));
                 }
                 client.submitTasks(agentTaskRequests);
                 if (LOG.isDebugEnabled()) {
                     for (AgentTask task : tasks) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("send task: type[{}], backend[{}], 
signature[{}]",
-                                    task.getTaskType(), backendId, 
task.getSignature());
-                        }
+                        LOG.debug("send task: type[{}], backend[{}], 
signature[{}]",
+                                task.getTaskType(), backendId, 
task.getSignature());
                     }
                 }
                 ok = true;
             } catch (Exception e) {
                 LOG.warn("task exec error. backend[{}]", backendId, e);
+                errMsg = String.format("task exec error: %s. backend[%d]", 
e.getMessage(), backendId);
             } finally {
                 if (ok) {
                     ClientPool.backendPool.returnObject(address, client);
                 } else {
                     ClientPool.backendPool.invalidateObject(address, client);
+                    List<AgentTask> tasks = 
this.backendIdToTasks.get(backendId);
+                    for (AgentTask task : tasks) {
+                        task.failedWithMsg(errMsg);
+                    }
                 }
             }
         } // end for backend
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
index f878fc521f5..0ba998b3808 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
@@ -110,6 +110,10 @@ public abstract class AgentTask {
         ++this.failedTimes;
     }
 
+    public void failedWithMsg(String errMsg) {
+        failed();
+    }
+
     public int getFailedTimes() {
         return this.failedTimes;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 10baf65a838..c410f37e5c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -235,6 +235,23 @@ public class CreateReplicaTask extends AgentTask {
         }
     }
 
+    @Override
+    public void failedWithMsg(String errMsg) {
+        super.failedWithMsg(errMsg);
+
+        // CreateReplicaTask will not trigger a retry in ReportTask. 
Therefore, it needs to
+        // be marked as failed here and all threads waiting for the result of
+        // CreateReplicaTask need to be awakened.
+        if (this.latch != null) {
+            Status s = new Status(TStatusCode.CANCELLED, errMsg);
+            latch.markedCountDownWithStatus(getBackendId(), getTabletId(), s);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("CreateReplicaTask failed with msg: {}, tablet: {}, 
backend: {}",
+                        errMsg, getTabletId(), getBackendId());
+            }
+        }
+    }
+
     public void setLatch(MarkedCountDownLatch<Long, Long> latch) {
         this.latch = latch;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to