This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch opitimize_query_terminator_in_local
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit af7974ef0e44df9bc55105ee469b7813df3c2076
Author: Beyyes <[email protected]>
AuthorDate: Wed May 24 10:20:13 2023 +0800

    Avoid rpc invoking for SimpleQueryTerminator when endpoint is local address
---
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 27 ++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 841abf4deae..be1712c6c6f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -23,8 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -101,6 +104,18 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
       if (unfinishedFIs.isEmpty()) {
         continue;
       }
+
+      String internalAddress = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+      int internalPort = 
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+      if (internalAddress.equalsIgnoreCase(endPoint.getIp())
+          && internalPort == endPoint.getPort()) {
+        for (TFragmentInstanceId insId : unfinishedFIs) {
+          FragmentInstanceManager.getInstance()
+              .cancelTask(FragmentInstanceId.fromThrift(insId), false);
+        }
+        return true;
+      }
+
       try (SyncDataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
         client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, 
false));
@@ -126,6 +141,18 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
       if (unfinishedFIs.isEmpty()) {
         continue;
       }
+
+      String internalAddress = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+      int internalPort = 
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+      if (internalAddress.equalsIgnoreCase(endPoint.getIp())
+          && internalPort == endPoint.getPort()) {
+        for (TFragmentInstanceId insId : unfinishedFIs) {
+          FragmentInstanceManager.getInstance()
+              .cancelTask(FragmentInstanceId.fromThrift(insId), true);
+        }
+        return true;
+      }
+
       try (SyncDataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
         client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, 
true));

Reply via email to