This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new feb61e8d83a Pipe: Fix InsertRowsNode is not supported in batch mode 
(#12517)
feb61e8d83a is described below

commit feb61e8d83a8e4ef3e995690de5b6b29185067e2
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 13 18:39:58 2024 +0800

    Pipe: Fix InsertRowsNode is not supported in batch mode (#12517)
---
 .../request/PipeTransferTabletBatchReq.java        | 23 ++++++---
 .../request/PipeTransferTabletBinaryReq.java       | 57 +++++++---------------
 .../request/PipeTransferTabletInsertNodeReq.java   | 13 ++---
 3 files changed, 39 insertions(+), 54 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 73e1d2b82c5..8090f650489 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -70,6 +70,9 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
         insertRowStatementList.add((InsertRowStatement) statement);
       } else if (statement instanceof InsertTabletStatement) {
         insertTabletStatementList.add((InsertTabletStatement) statement);
+      } else if (statement instanceof InsertRowsStatement) {
+        insertRowStatementList.addAll(
+            ((InsertRowsStatement) statement).getInsertRowStatementList());
       } else {
         throw new UnsupportedOperationException(
             String.format(
@@ -87,11 +90,14 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
         insertRowStatementList.add((InsertRowStatement) statement);
       } else if (statement instanceof InsertTabletStatement) {
         insertTabletStatementList.add((InsertTabletStatement) statement);
+      } else if (statement instanceof InsertRowsStatement) {
+        insertRowStatementList.addAll(
+            ((InsertRowsStatement) statement).getInsertRowStatementList());
       } else {
         throw new UnsupportedOperationException(
             String.format(
-                "unknown InsertBaseStatement %s constructed from 
PipeTransferTabletInsertNodeReq.",
-                insertNodeReq));
+                "Unknown InsertBaseStatement %s constructed from 
PipeTransferTabletInsertNodeReq.",
+                statement));
       }
     }
 
@@ -111,9 +117,9 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletBatchReq toTPipeTransferReq(
-      List<ByteBuffer> binaryBuffers,
-      List<ByteBuffer> insertNodeBuffers,
-      List<ByteBuffer> tabletBuffers)
+      final List<ByteBuffer> binaryBuffers,
+      final List<ByteBuffer> insertNodeBuffers,
+      final List<ByteBuffer> tabletBuffers)
       throws IOException {
     final PipeTransferTabletBatchReq batchReq = new 
PipeTransferTabletBatchReq();
 
@@ -147,7 +153,8 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     return batchReq;
   }
 
-  public static PipeTransferTabletBatchReq 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+  public static PipeTransferTabletBatchReq fromTPipeTransferReq(
+      final TPipeTransferReq transferReq) {
     final PipeTransferTabletBatchReq batchReq = new 
PipeTransferTabletBatchReq();
 
     int size = ReadWriteIOUtils.readInt(transferReq.body);
@@ -200,14 +207,14 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
   /////////////////////////////// Object ///////////////////////////////
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
+    final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
     return binaryReqs.equals(that.binaryReqs)
         && insertNodeReqs.equals(that.insertNodeReqs)
         && tabletReqs.equals(that.tabletReqs)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
index ae8da894347..5e9e0a39103 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
@@ -21,13 +21,13 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
@@ -51,41 +51,17 @@ public class PipeTransferTabletBinaryReq extends 
TPipeTransferReq {
   public InsertBaseStatement constructStatement() {
     final InsertNode insertNode = parseByteBuffer();
 
-    if (insertNode instanceof InsertRowNode) {
-      final InsertRowNode node = (InsertRowNode) insertNode;
-
-      final InsertRowStatement statement = new InsertRowStatement();
-      statement.setDevicePath(node.getDevicePath());
-      statement.setTime(node.getTime());
-      statement.setMeasurements(node.getMeasurements());
-      statement.setDataTypes(node.getDataTypes());
-      statement.setValues(node.getValues());
-      statement.setNeedInferType(node.isNeedInferType());
-      statement.setAligned(node.isAligned());
-      statement.setMeasurementSchemas(node.getMeasurementSchemas());
-      return statement;
+    if (!(insertNode instanceof InsertRowNode
+        || insertNode instanceof InsertTabletNode
+        || insertNode instanceof InsertRowsNode)) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Unknown InsertNode type %s when constructing statement from 
insert node.",
+              insertNode));
     }
 
-    if (insertNode instanceof InsertTabletNode) {
-      final InsertTabletNode node = (InsertTabletNode) insertNode;
-
-      final InsertTabletStatement statement = new InsertTabletStatement();
-      statement.setDevicePath(node.getDevicePath());
-      statement.setMeasurements(node.getMeasurements());
-      statement.setTimes(node.getTimes());
-      statement.setColumns(node.getColumns());
-      statement.setBitMaps(node.getBitMaps());
-      statement.setRowCount(node.getRowCount());
-      statement.setDataTypes(node.getDataTypes());
-      statement.setAligned(node.isAligned());
-      statement.setMeasurementSchemas(node.getMeasurementSchemas());
-      return statement;
-    }
-
-    throw new UnsupportedOperationException(
-        String.format(
-            "unknown InsertNode type %s when constructing statement from 
insert node.",
-            insertNode));
+    return (InsertBaseStatement)
+        IoTDBDataNodeReceiver.PLAN_TO_STATEMENT_VISITOR.process(insertNode, 
null);
   }
 
   private InsertNode parseByteBuffer() {
@@ -95,7 +71,7 @@ public class PipeTransferTabletBinaryReq extends 
TPipeTransferReq {
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferTabletBinaryReq toTPipeTransferReq(ByteBuffer 
byteBuffer) {
+  public static PipeTransferTabletBinaryReq toTPipeTransferReq(final 
ByteBuffer byteBuffer) {
     final PipeTransferTabletBinaryReq req = new PipeTransferTabletBinaryReq();
     req.byteBuffer = byteBuffer;
 
@@ -106,7 +82,8 @@ public class PipeTransferTabletBinaryReq extends 
TPipeTransferReq {
     return req;
   }
 
-  public static PipeTransferTabletBinaryReq 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+  public static PipeTransferTabletBinaryReq fromTPipeTransferReq(
+      final TPipeTransferReq transferReq) {
     final PipeTransferTabletBinaryReq binaryReq = new 
PipeTransferTabletBinaryReq();
     binaryReq.byteBuffer = transferReq.body;
 
@@ -119,7 +96,7 @@ public class PipeTransferTabletBinaryReq extends 
TPipeTransferReq {
 
   /////////////////////////////// Air Gap ///////////////////////////////
 
-  public static byte[] toTPipeTransferBytes(ByteBuffer byteBuffer) throws 
IOException {
+  public static byte[] toTPipeTransferBytes(final ByteBuffer byteBuffer) 
throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
@@ -131,14 +108,14 @@ public class PipeTransferTabletBinaryReq extends 
TPipeTransferReq {
   /////////////////////////////// Object ///////////////////////////////
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferTabletBinaryReq that = (PipeTransferTabletBinaryReq) obj;
+    final PipeTransferTabletBinaryReq that = (PipeTransferTabletBinaryReq) obj;
     return byteBuffer.equals(that.byteBuffer)
         && version == that.version
         && type == that.type
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index 7fad79e843e..c45417ba99d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -66,7 +66,7 @@ public class PipeTransferTabletInsertNodeReq extends 
TPipeTransferReq {
 
   /////////////////////////////// WriteBack & Batch 
///////////////////////////////
 
-  public static PipeTransferTabletInsertNodeReq 
toTPipeTransferRawReq(InsertNode insertNode) {
+  public static PipeTransferTabletInsertNodeReq toTPipeTransferRawReq(final 
InsertNode insertNode) {
     final PipeTransferTabletInsertNodeReq req = new 
PipeTransferTabletInsertNodeReq();
 
     req.insertNode = insertNode;
@@ -76,7 +76,7 @@ public class PipeTransferTabletInsertNodeReq extends 
TPipeTransferReq {
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferTabletInsertNodeReq toTPipeTransferReq(InsertNode 
insertNode) {
+  public static PipeTransferTabletInsertNodeReq toTPipeTransferReq(final 
InsertNode insertNode) {
     final PipeTransferTabletInsertNodeReq req = new 
PipeTransferTabletInsertNodeReq();
 
     req.insertNode = insertNode;
@@ -88,7 +88,8 @@ public class PipeTransferTabletInsertNodeReq extends 
TPipeTransferReq {
     return req;
   }
 
-  public static PipeTransferTabletInsertNodeReq 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+  public static PipeTransferTabletInsertNodeReq fromTPipeTransferReq(
+      final TPipeTransferReq transferReq) {
     final PipeTransferTabletInsertNodeReq insertNodeReq = new 
PipeTransferTabletInsertNodeReq();
 
     insertNodeReq.insertNode = (InsertNode) 
PlanNodeType.deserialize(transferReq.body);
@@ -101,7 +102,7 @@ public class PipeTransferTabletInsertNodeReq extends 
TPipeTransferReq {
   }
 
   /////////////////////////////// Air Gap ///////////////////////////////
-  public static byte[] toTPipeTransferBytes(InsertNode insertNode) throws 
IOException {
+  public static byte[] toTPipeTransferBytes(final InsertNode insertNode) 
throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
@@ -114,14 +115,14 @@ public class PipeTransferTabletInsertNodeReq extends 
TPipeTransferReq {
   /////////////////////////////// Object ///////////////////////////////
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferTabletInsertNodeReq that = (PipeTransferTabletInsertNodeReq) 
obj;
+    final PipeTransferTabletInsertNodeReq that = 
(PipeTransferTabletInsertNodeReq) obj;
     return Objects.equals(insertNode, that.insertNode)
         && version == that.version
         && type == that.type

Reply via email to