This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e16003793 Rollback modification of 
FrangmentInstanceDispatherImpl.dispatchWriteSync (#7933)
7e16003793 is described below

commit 7e16003793bdd05bb3cfd9993b57040622cbc2ed
Author: Marcos_Zyk <[email protected]>
AuthorDate: Tue Nov 8 21:08:40 2022 +0800

    Rollback modification of FrangmentInstanceDispatherImpl.dispatchWriteSync 
(#7933)
---
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 58 +++-------------------
 1 file changed, 8 insertions(+), 50 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 1a2d6e7991..1128b97341 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -48,9 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -116,60 +114,20 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   private Future<FragInstanceDispatchResult> 
dispatchWriteSync(List<FragmentInstance> instances) {
-    List<Future<Throwable>> futureList = new ArrayList<>();
     for (FragmentInstance instance : instances) {
-      futureList.add(
-          writeOperationExecutor.submit(
-              () -> {
-                try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
-                  dispatchOneInstance(instance);
-                  return null;
-                } catch (Throwable t) {
-                  return t;
-                }
-              }));
-    }
-
-    List<Throwable> throwableList = new ArrayList<>();
-
-    for (Future<Throwable> future : futureList) {
-      try {
-        Throwable t = future.get();
-        if (t != null) {
-          throwableList.add(t);
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        throwableList.add(e);
-        logger.error("[DispatchFailed]", e);
+      try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
+        dispatchOneInstance(instance);
+      } catch (FragmentInstanceDispatchException e) {
+        return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
+      } catch (Throwable t) {
+        logger.error("[DispatchFailed]", t);
         return immediateFuture(
             new FragInstanceDispatchResult(
                 RpcUtils.getStatus(
-                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " 
+ e.getMessage())));
-      }
-    }
-
-    if (throwableList.isEmpty()) {
-      return immediateFuture(new FragInstanceDispatchResult(true));
-    } else {
-      List<TSStatus> failureStatusList = new ArrayList<>(throwableList.size());
-      for (Throwable t : throwableList) {
-        if (t instanceof FragmentInstanceDispatchException) {
-          failureStatusList.add(((FragmentInstanceDispatchException) 
t).getFailureStatus());
-        } else {
-          logger.error("[DispatchFailed]", t);
-          return immediateFuture(
-              new FragInstanceDispatchResult(
-                  RpcUtils.getStatus(
-                      TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: 
" + t.getMessage())));
-        }
-      }
-      if (failureStatusList.size() == 1) {
-        return immediateFuture(new 
FragInstanceDispatchResult(failureStatusList.get(0)));
-      } else {
-        return immediateFuture(
-            new 
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " 
+ t.getMessage())));
       }
     }
+    return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
   private void dispatchOneInstance(FragmentInstance instance)

Reply via email to