This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch support_table_model_redirect
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e025251a83ad22d2c287b72497776bc876e993d6
Author: HTHou <[email protected]>
AuthorDate: Thu Sep 19 14:34:40 2024 +0800

    dev_server_side
---
 .../db/queryengine/plan/analyze/Analysis.java      |  1 +
 .../db/queryengine/plan/analyze/IAnalysis.java     |  2 +
 .../planner/plan/node/write/InsertTabletNode.java  | 10 ++---
 .../node/write/RelationalInsertTabletNode.java     | 43 ++++++++++++++++++++++
 .../plan/relational/analyzer/Analysis.java         |  5 +++
 .../plan/relational/planner/TableModelPlanner.java | 39 +++++++++++++++++++-
 6 files changed, 94 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 11e8e42ec83..1f8905cc666 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -414,6 +414,7 @@ public class Analysis implements IAnalysis {
     this.schemaTree = schemaTree;
   }
 
+  @Override
   public List<TEndPoint> getRedirectNodeList() {
     return redirectNodeList;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index 05c3638a95d..e03f75db230 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -66,6 +66,8 @@ public interface IAnalysis {
 
   DataPartition getDataPartitionInfo();
 
+  List<TEndPoint> getRedirectNodeList();
+
   void setRedirectNodeList(List<TEndPoint> redirectNodeList);
 
   void addEndPointToRedirectNodeList(TEndPoint endPoint);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index d40d1c50b84..3a2027d1927 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -248,7 +248,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     return deviceIDSplitInfoMap;
   }
 
-  private Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
+  protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
       Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis 
analysis) {
     Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
 
@@ -1189,12 +1189,12 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     return deviceID;
   }
 
-  private static class PartitionSplitInfo {
+  protected static class PartitionSplitInfo {
 
     // for each List in split, they are range1.start, range1.end, 
range2.start, range2.end, ...
-    private List<Integer> ranges = new ArrayList<>();
-    private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
-    private List<TRegionReplicaSet> replicaSets;
+    List<Integer> ranges = new ArrayList<>();
+    List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+    List<TRegionReplicaSet> replicaSets;
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index c2129f8d8d7..b6b97457cff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -19,10 +19,13 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -41,7 +44,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.IntToLongFunction;
 
 public class RelationalInsertTabletNode extends InsertTabletNode {
@@ -139,6 +144,44 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
         columnCategories);
   }
 
+  protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
+      Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis 
analysis) {
+    Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
+    Map<IDeviceID, TEndPoint> endPointMap = new HashMap<>();
+
+    for (Map.Entry<IDeviceID, PartitionSplitInfo> entry : 
deviceIDSplitInfoMap.entrySet()) {
+      final IDeviceID deviceID = entry.getKey();
+      final PartitionSplitInfo splitInfo = entry.getValue();
+      final List<TRegionReplicaSet> replicaSets =
+          analysis
+              .getDataPartitionInfo()
+              .getDataRegionReplicaSetForWriting(
+                  deviceID, splitInfo.timePartitionSlots, 
analysis.getDatabaseName());
+      splitInfo.replicaSets = replicaSets;
+      // collect redirectInfo
+      endPointMap.put(
+          deviceID,
+          replicaSets
+              .get(replicaSets.size() - 1)
+              .getDataNodeLocations()
+              .get(0)
+              .getClientRpcEndPoint());
+      for (int i = 0; i < replicaSets.size(); i++) {
+        List<Integer> subRanges =
+            splitMap.computeIfAbsent(replicaSets.get(i), x -> new 
ArrayList<>());
+        subRanges.add(splitInfo.ranges.get(2 * i));
+        subRanges.add(splitInfo.ranges.get(2 * i + 1));
+      }
+    }
+    List<TEndPoint> redirectNodeList = new ArrayList<>(times.length);
+    for (int i = 0; i < times.length; i++) {
+      IDeviceID deviceId = getDeviceID(i);
+      redirectNodeList.add(endPointMap.get(deviceId));
+    }
+    analysis.setRedirectNodeList(redirectNodeList);
+    return splitMap;
+  }
+
   public static RelationalInsertTabletNode deserialize(ByteBuffer byteBuffer) {
     RelationalInsertTabletNode insertNode = new RelationalInsertTabletNode(new 
PlanNodeId(""));
     insertNode.subDeserialize(byteBuffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index bb1678f6598..774f7576a48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -750,6 +750,11 @@ public class Analysis implements IAnalysis {
     }
   }
 
+  @Override
+  public List<TEndPoint> getRedirectNodeList() {
+    return redirectNodeList;
+  }
+
   @Override
   public void setRedirectNodeList(List<TEndPoint> redirectNodeList) {
     this.redirectNodeList = redirectNodeList;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index e750e2d95a8..762939c7c74 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -38,12 +38,18 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
 import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -149,7 +155,38 @@ public class TableModelPlanner implements IPlanner {
 
   @Override
   public void setRedirectInfo(
-      IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode) {}
+      IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode) {
+    Analysis analysis = (Analysis) iAnalysis;
+
+    assert analysis.getStatement() instanceof WrappedInsertStatement;
+    InsertBaseStatement insertStatement =
+        ((WrappedInsertStatement) 
analysis.getStatement()).getInnerTreeStatement();
+
+    if (!analysis.isFinishQueryAfterAnalyze()) {
+      // Table Model Session only supports insertTablet
+      if (insertStatement instanceof InsertTabletStatement) {
+        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+          boolean needRedirect = false;
+          List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
+          List<TSStatus> subStatus = new ArrayList<>(redirectNodeList.size());
+          for (TEndPoint endPoint : redirectNodeList) {
+            // redirect writing only if the redirectEndPoint is not the 
current node
+            if (!localEndPoint.equals(endPoint)) {
+              subStatus.add(
+                  
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
+              needRedirect = true;
+            } else {
+              subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+            }
+          }
+          if (needRedirect) {
+            
tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+            tsstatus.setSubStatus(subStatus);
+          }
+        }
+      }
+    }
+  }
 
   private static class NopAccessControl implements AccessControl {}
 }

Reply via email to