This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 31ac1c3027 [IOTDB-5399] Collect complete dispatch result for return
(#8963)
31ac1c3027 is described below
commit 31ac1c3027a87762ce38a69d29c17724bba1ffa3
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Feb 2 17:01:24 2023 +0800
[IOTDB-5399] Collect complete dispatch result for return (#8963)
---
.../scheduler/FragmentInstanceDispatcherImpl.java | 29 ++++++++++++++++++----
1 file changed, 24 insertions(+), 5 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index cedfdbb684..f299d862e9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -49,6 +49,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -119,20 +120,38 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
private Future<FragInstanceDispatchResult>
dispatchWriteSync(List<FragmentInstance> instances) {
+ List<TSStatus> failureStatusList = new ArrayList<>();
for (FragmentInstance instance : instances) {
try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
dispatchOneInstance(instance);
} catch (FragmentInstanceDispatchException e) {
- return immediateFuture(new
FragInstanceDispatchResult(e.getFailureStatus()));
+ TSStatus failureStatus = e.getFailureStatus();
+ if (instances.size() == 1) {
+ failureStatusList.add(failureStatus);
+ } else {
+ if (failureStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ failureStatusList.addAll(failureStatus.getSubStatus());
+ } else {
+ failureStatusList.add(failureStatus);
+ }
+ }
} catch (Throwable t) {
logger.warn("[DispatchFailed]", t);
+ failureStatusList.add(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " +
t.getMessage()));
+ }
+ }
+ if (failureStatusList.isEmpty()) {
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ } else {
+ if (instances.size() == 1) {
+ return immediateFuture(new
FragInstanceDispatchResult(failureStatusList.get(0)));
+ } else {
return immediateFuture(
- new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: "
+ t.getMessage())));
+ new
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
}
}
- return immediateFuture(new FragInstanceDispatchResult(true));
}
private void dispatchOneInstance(FragmentInstance instance)