This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fast_write_test_0423 by this
push:
new bfed374192 fix bug in session part
bfed374192 is described below
commit bfed37419283b13849767f80904abdc4b6e5332d
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Apr 24 15:07:17 2023 +0800
fix bug in session part
---
.../java/org/apache/iotdb/FastInsertExample.java | 2 +-
.../execution/executor/RegionWriteExecutor.java | 2 --
.../mpp/plan/analyze/schema/SchemaValidator.java | 1 +
.../planner/plan/node/write/FastInsertRowNode.java | 32 ++++++++++++++++++++++
.../plan/node/write/FastInsertRowsNode.java | 11 ++++++++
.../planner/plan/node/write/InsertRowNode.java | 2 +-
.../java/org/apache/iotdb/session/Session.java | 3 +-
.../apache/iotdb/session/util/SessionUtils.java | 1 +
8 files changed, 48 insertions(+), 6 deletions(-)
diff --git
a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
index 5e2a3a9efc..b24dd725f8 100644
--- a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
@@ -65,7 +65,7 @@ public class FastInsertExample {
List<Long> timestamps = new ArrayList<>();
List<List<TSDataType>> typesList = new ArrayList<>();
- for (long time = 0; time < 500; time++) {
+ for (long time = 1000; time < 1500; time++) {
List<Object> values = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
values.add(1L);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index f87669774d..c4f707702c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -257,8 +257,6 @@ public class RegionWriteExecutor {
LOGGER.warn(partialInsertMessage);
}
- // TODO: (FAStWRITE) (侯昊男) 根据数据类型把 values 反序列化出来
-
// TODO: (FAStWRITE) 然后再进入到这一步
ConsensusWriteResponse writeResponse =
fireTriggerAndInsert(context.getRegionId(), insertNode);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 2d6b10ec94..b9270d33f7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -43,6 +43,7 @@ public class SchemaValidator {
if (insertNode instanceof FastInsertRowsNode) {
SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreateForFastWrite(
((BatchInsertNode) insertNode).getSchemaValidationList());
+ ((FastInsertRowsNode) insertNode).fillValues();
} else {
SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
((BatchInsertNode) insertNode).getSchemaValidationList());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index 80ea5de6a4..378d6a29c9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -111,6 +111,10 @@ public class FastInsertRowNode extends InsertRowNode {
this.rawValues = ByteBuffer.wrap(bytes);
}
+ public boolean hasFailedMeasurements() {
+ return false;
+ }
+
@Override
public void initMeasurementSchemaContainer(int size, String[] measurements) {
this.measurementSchemas = new MeasurementSchema[size];
@@ -131,4 +135,32 @@ public class FastInsertRowNode extends InsertRowNode {
measurementSchemas[index] = measurementSchemaInfo.getSchema();
this.dataTypes[index] = measurementSchemaInfo.getSchema().getType();
}
+
+ public void fillValues() throws QueryProcessException {
+ this.values = new Object[measurements.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ values[i] = ReadWriteIOUtils.readBool(this.rawValues);
+ break;
+ case INT32:
+ values[i] = ReadWriteIOUtils.readInt(this.rawValues);
+ break;
+ case INT64:
+ values[i] = ReadWriteIOUtils.readLong(this.rawValues);
+ break;
+ case FLOAT:
+ values[i] = ReadWriteIOUtils.readFloat(this.rawValues);
+ break;
+ case DOUBLE:
+ values[i] = ReadWriteIOUtils.readDouble(this.rawValues);
+ break;
+ case TEXT:
+ values[i] = ReadWriteIOUtils.readBinary(this.rawValues);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" +
dataTypes[i]);
+ }
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index aaa3815f63..21066602a1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
@@ -42,6 +43,16 @@ public class FastInsertRowsNode extends InsertRowsNode {
super(id, insertRowNodeIndexList, fastInsertRowNodeList);
}
+ public boolean hasFailedMeasurements() {
+ return false;
+ }
+
+ public void fillValues() throws QueryProcessException {
+ for (int i = 0; i < getInsertRowNodeList().size(); i++) {
+ ((FastInsertRowNode) getInsertRowNodeList().get(i)).fillValues();
+ }
+ }
+
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 74a35f5576..d4d883e202 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -77,7 +77,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue, ISchemaV
private long time;
// TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段
- private Object[] values;
+ protected Object[] values;
private boolean isNeedInferType = false;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index e39b23629f..822ba5c39e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -2480,8 +2480,7 @@ public class Session implements ISession {
throws IoTDBConnectionException {
request.addToPrefixPaths(deviceId);
request.addToTimestamps(time);
- ByteBuffer buffer =
- ByteBuffer.allocate(SessionUtils.calculateLengthForFastInsert(types,
values));
+ ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
request.addToValuesList(buffer);
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 5f55031b4f..8002773918 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -79,6 +79,7 @@ public class SessionUtils {
public static ByteBuffer getValueBuffer(List<TSDataType> types, List<Object>
values)
throws IoTDBConnectionException {
ByteBuffer buffer =
ByteBuffer.allocate(SessionUtils.calculateLength(types, values));
+ SessionUtils.putValues(types, values, buffer);
return buffer;
}