This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_0423 by this 
push:
     new bfed374192 fix bug in session part
bfed374192 is described below

commit bfed37419283b13849767f80904abdc4b6e5332d
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Apr 24 15:07:17 2023 +0800

    fix bug in session part
---
 .../java/org/apache/iotdb/FastInsertExample.java   |  2 +-
 .../execution/executor/RegionWriteExecutor.java    |  2 --
 .../mpp/plan/analyze/schema/SchemaValidator.java   |  1 +
 .../planner/plan/node/write/FastInsertRowNode.java | 32 ++++++++++++++++++++++
 .../plan/node/write/FastInsertRowsNode.java        | 11 ++++++++
 .../planner/plan/node/write/InsertRowNode.java     |  2 +-
 .../java/org/apache/iotdb/session/Session.java     |  3 +-
 .../apache/iotdb/session/util/SessionUtils.java    |  1 +
 8 files changed, 48 insertions(+), 6 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java 
b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
index 5e2a3a9efc..b24dd725f8 100644
--- a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
@@ -65,7 +65,7 @@ public class FastInsertExample {
     List<Long> timestamps = new ArrayList<>();
     List<List<TSDataType>> typesList = new ArrayList<>();
 
-    for (long time = 0; time < 500; time++) {
+    for (long time = 1000; time < 1500; time++) {
       List<Object> values = new ArrayList<>();
       List<TSDataType> types = new ArrayList<>();
       values.add(1L);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index f87669774d..c4f707702c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -257,8 +257,6 @@ public class RegionWriteExecutor {
           LOGGER.warn(partialInsertMessage);
         }
 
-        // TODO: (FAStWRITE) (侯昊男) 根据数据类型把 values 反序列化出来
-
         // TODO: (FAStWRITE) 然后再进入到这一步
         ConsensusWriteResponse writeResponse =
             fireTriggerAndInsert(context.getRegionId(), insertNode);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 2d6b10ec94..b9270d33f7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -43,6 +43,7 @@ public class SchemaValidator {
         if (insertNode instanceof FastInsertRowsNode) {
           SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreateForFastWrite(
               ((BatchInsertNode) insertNode).getSchemaValidationList());
+          ((FastInsertRowsNode) insertNode).fillValues();
         } else {
           SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
               ((BatchInsertNode) insertNode).getSchemaValidationList());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index 80ea5de6a4..378d6a29c9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -111,6 +111,10 @@ public class FastInsertRowNode extends InsertRowNode {
     this.rawValues = ByteBuffer.wrap(bytes);
   }
 
+  public boolean hasFailedMeasurements() {
+    return false;
+  }
+
   @Override
   public void initMeasurementSchemaContainer(int size, String[] measurements) {
     this.measurementSchemas = new MeasurementSchema[size];
@@ -131,4 +135,32 @@ public class FastInsertRowNode extends InsertRowNode {
     measurementSchemas[index] = measurementSchemaInfo.getSchema();
     this.dataTypes[index] = measurementSchemaInfo.getSchema().getType();
   }
+
+  public void fillValues() throws QueryProcessException {
+    this.values = new Object[measurements.length];
+    for (int i = 0; i < dataTypes.length; i++) {
+      switch (dataTypes[i]) {
+        case BOOLEAN:
+          values[i] = ReadWriteIOUtils.readBool(this.rawValues);
+          break;
+        case INT32:
+          values[i] = ReadWriteIOUtils.readInt(this.rawValues);
+          break;
+        case INT64:
+          values[i] = ReadWriteIOUtils.readLong(this.rawValues);
+          break;
+        case FLOAT:
+          values[i] = ReadWriteIOUtils.readFloat(this.rawValues);
+          break;
+        case DOUBLE:
+          values[i] = ReadWriteIOUtils.readDouble(this.rawValues);
+          break;
+        case TEXT:
+          values[i] = ReadWriteIOUtils.readBinary(this.rawValues);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported data type:" + 
dataTypes[i]);
+      }
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index aaa3815f63..21066602a1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
@@ -42,6 +43,16 @@ public class FastInsertRowsNode extends InsertRowsNode {
     super(id, insertRowNodeIndexList, fastInsertRowNodeList);
   }
 
+  public boolean hasFailedMeasurements() {
+    return false;
+  }
+
+  public void fillValues() throws QueryProcessException {
+    for (int i = 0; i < getInsertRowNodeList().size(); i++) {
+      ((FastInsertRowNode) getInsertRowNodeList().get(i)).fillValues();
+    }
+  }
+
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 74a35f5576..d4d883e202 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -77,7 +77,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue, ISchemaV
   private long time;
 
   // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段
-  private Object[] values;
+  protected Object[] values;
 
   private boolean isNeedInferType = false;
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index e39b23629f..822ba5c39e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -2480,8 +2480,7 @@ public class Session implements ISession {
       throws IoTDBConnectionException {
     request.addToPrefixPaths(deviceId);
     request.addToTimestamps(time);
-    ByteBuffer buffer =
-        ByteBuffer.allocate(SessionUtils.calculateLengthForFastInsert(types, 
values));
+    ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
     request.addToValuesList(buffer);
   }
 
diff --git 
a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java 
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 5f55031b4f..8002773918 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -79,6 +79,7 @@ public class SessionUtils {
   public static ByteBuffer getValueBuffer(List<TSDataType> types, List<Object> 
values)
       throws IoTDBConnectionException {
     ByteBuffer buffer = 
ByteBuffer.allocate(SessionUtils.calculateLength(types, values));
+    SessionUtils.putValues(types, values, buffer);
     return buffer;
   }
 

Reply via email to