This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 1556e512e3 [To rel/1.0][IOTDB-5399] Collect complete dispatch result 
for return #8964
1556e512e3 is described below

commit 1556e512e3892346085896a5c34873ed451f8e52
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Feb 2 17:00:54 2023 +0800

    [To rel/1.0][IOTDB-5399] Collect complete dispatch result for return #8964
---
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 29 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index bd78965ea7..c5a9e1283b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -48,6 +48,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -114,20 +115,38 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   private Future<FragInstanceDispatchResult> 
dispatchWriteSync(List<FragmentInstance> instances) {
+    List<TSStatus> failureStatusList = new ArrayList<>();
     for (FragmentInstance instance : instances) {
       try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
         dispatchOneInstance(instance);
       } catch (FragmentInstanceDispatchException e) {
-        return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
+        TSStatus failureStatus = e.getFailureStatus();
+        if (instances.size() == 1) {
+          failureStatusList.add(failureStatus);
+        } else {
+          if (failureStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+            failureStatusList.addAll(failureStatus.getSubStatus());
+          } else {
+            failureStatusList.add(failureStatus);
+          }
+        }
       } catch (Throwable t) {
         logger.warn("[DispatchFailed]", t);
+        failureStatusList.add(
+            RpcUtils.getStatus(
+                TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + 
t.getMessage()));
+      }
+    }
+    if (failureStatusList.isEmpty()) {
+      return immediateFuture(new FragInstanceDispatchResult(true));
+    } else {
+      if (instances.size() == 1) {
+        return immediateFuture(new 
FragInstanceDispatchResult(failureStatusList.get(0)));
+      } else {
         return immediateFuture(
-            new FragInstanceDispatchResult(
-                RpcUtils.getStatus(
-                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " 
+ t.getMessage())));
+            new 
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
       }
     }
-    return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
   private void dispatchOneInstance(FragmentInstance instance)

Reply via email to