This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 1556e512e3 [To rel/1.0][IOTDB-5399] Collect complete dispatch result
for return #8964
1556e512e3 is described below
commit 1556e512e3892346085896a5c34873ed451f8e52
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Feb 2 17:00:54 2023 +0800
[To rel/1.0][IOTDB-5399] Collect complete dispatch result for return #8964
---
.../scheduler/FragmentInstanceDispatcherImpl.java | 29 ++++++++++++++++++----
1 file changed, 24 insertions(+), 5 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index bd78965ea7..c5a9e1283b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -48,6 +48,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -114,20 +115,38 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
private Future<FragInstanceDispatchResult>
dispatchWriteSync(List<FragmentInstance> instances) {
+ List<TSStatus> failureStatusList = new ArrayList<>();
for (FragmentInstance instance : instances) {
try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
dispatchOneInstance(instance);
} catch (FragmentInstanceDispatchException e) {
- return immediateFuture(new
FragInstanceDispatchResult(e.getFailureStatus()));
+ TSStatus failureStatus = e.getFailureStatus();
+ if (instances.size() == 1) {
+ failureStatusList.add(failureStatus);
+ } else {
+ if (failureStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ failureStatusList.addAll(failureStatus.getSubStatus());
+ } else {
+ failureStatusList.add(failureStatus);
+ }
+ }
} catch (Throwable t) {
logger.warn("[DispatchFailed]", t);
+ failureStatusList.add(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " +
t.getMessage()));
+ }
+ }
+ if (failureStatusList.isEmpty()) {
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ } else {
+ if (instances.size() == 1) {
+ return immediateFuture(new
FragInstanceDispatchResult(failureStatusList.get(0)));
+ } else {
return immediateFuture(
- new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: "
+ t.getMessage())));
+ new
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
}
}
- return immediateFuture(new FragInstanceDispatchResult(true));
}
private void dispatchOneInstance(FragmentInstance instance)