This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch fast_write_test_with_guoneng in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b9cae50e03960f04eff7ec742f73032aad6a6f92 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Apr 24 16:34:10 2023 +0800 3c3d ok without IoTConsensus --- .../java/org/apache/iotdb/FastInsertExample.java | 93 ++++++++++++++++++++++ .../main/java/org/apache/iotdb/PrepareWrite.java | 31 ++++++++ .../src/main/java/org/apache/iotdb/WriteTest.java | 5 +- .../org/apache/iotdb/WriteTestFixParallel.java | 26 +++--- .../apache/iotdb/isession/pool/ISessionPool.java | 7 ++ .../mpp/plan/planner/plan/node/PlanNodeType.java | 5 +- .../plan/node/write/FastInsertRowsNode.java | 45 +++++++++++ .../planner/plan/node/write/InsertRowsNode.java | 4 +- .../org/apache/iotdb/session/pool/SessionPool.java | 24 ++++++ 9 files changed, 221 insertions(+), 19 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java new file mode 100644 index 0000000000..835794171f --- /dev/null +++ b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb; + +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import java.util.ArrayList; +import java.util.List; + +@SuppressWarnings("squid:S106") +public class FastInsertExample { + + private static Session session; + private static Session sessionEnableRedirect; + private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1"; + private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2"; + private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3"; + private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4"; + private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5"; + private static final String ROOT_SG1_D1 = "root.sg1.d1"; + private static final String LOCAL_HOST = "192.168.130.16"; + + public static void main(String[] args) + throws IoTDBConnectionException, StatementExecutionException { + session = + new Session.Builder() + .host(LOCAL_HOST) + .port(6667) + .username("root") + .password("root") + .version(Version.V_1_0) + .build(); + session.open(false); + + fastInsertRecords(); + session.close(); + } + + private static void fastInsertRecords() + throws IoTDBConnectionException, StatementExecutionException { + String deviceId = ROOT_SG1_D1; + List<String> deviceIds = new ArrayList<>(); + List<List<Object>> valuesList = new ArrayList<>(); + List<Long> timestamps = new ArrayList<>(); + List<List<TSDataType>> typesList = new ArrayList<>(); + + for (long time = 0; time < 1; time++) { + List<Object> values = new ArrayList<>(); + List<TSDataType> types = new ArrayList<>(); + values.add(100L); + values.add(200L); + values.add(300L); + types.add(TSDataType.INT64); + types.add(TSDataType.INT64); + types.add(TSDataType.INT64); + + deviceIds.add(deviceId); + valuesList.add(values); + typesList.add(types); + timestamps.add(time); + if (time != 0 && time % 100 == 0) { + session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList); + deviceIds.clear(); + valuesList.clear(); + typesList.clear(); + timestamps.clear(); + } + } + + session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList); + } +} diff --git a/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java b/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java new file mode 100644 index 0000000000..be8170878b --- /dev/null +++ b/example/session/src/main/java/org/apache/iotdb/PrepareWrite.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb; + +public class PrepareWrite { + public static void main(String args[]) { + StringBuilder s = new StringBuilder("create schema template t1 (s0 FLOAT encoding=RLE"); + for (int i = 1; i < 500; i++) { + s.append(", s").append(i).append(" FLOAT encoding=RLE"); + } + s.append(")"); + System.out.println(s); + } +} diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTest.java b/example/session/src/main/java/org/apache/iotdb/WriteTest.java index 343ae9797e..7dc9c5029d 100644 --- a/example/session/src/main/java/org/apache/iotdb/WriteTest.java +++ b/example/session/src/main/java/org/apache/iotdb/WriteTest.java @@ -143,6 +143,7 @@ public class WriteTest { } public static void main(String[] args) throws InterruptedException { + // Choose the SessionPool you going to use constructRedirectSessionPool(); @@ -204,7 +205,6 @@ public class WriteTest { throws StatementExecutionException, IoTDBConnectionException { List<String> deviceIds = new ArrayList<>(); List<Long> times = new ArrayList<>(); - List<List<String>> measurementsList = new ArrayList<>(); List<List<TSDataType>> typesList = new ArrayList<>(); List<List<Object>> valuesList = new ArrayList<>(); int deviceCount = 0; @@ -217,12 +217,11 @@ public class WriteTest { values.add(floatData[(int) ((i + j + timestamp) % floatData.length)]); } valuesList.add(values); - measurementsList.add(measurements); typesList.add(types); deviceCount++; } - sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList); + sessionPool.fastInsertRecords(deviceIds, times, typesList, valuesList); return deviceCount; } } diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java index ca352705c7..ddbb4a7c16 100644 --- a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java +++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java @@ -180,19 +180,19 @@ public class WriteTestFixParallel { long startTime1 = System.nanoTime(); new Thread( - () -> { - while (true) { - try { - TimeUnit.MINUTES.sleep(1); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - long currentTime = System.nanoTime(); - LOGGER.info( - "write rate: {} lines/minute", - totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L)); - } - }) + () -> { + while (true) { + try { + TimeUnit.MINUTES.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + long currentTime = System.nanoTime(); + LOGGER.info( + "write rate: {} lines/minute", + totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L)); + } + }) .start(); } diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java index 2f5c017540..b3409cf019 100644 --- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java +++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java @@ -94,6 +94,13 @@ public interface ISessionPool { List<List<Object>> valuesList) throws IoTDBConnectionException, StatementExecutionException; + void fastInsertRecords( + List<String> multiSeriesIds, + List<Long> times, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException; + @Deprecated void insertOneDeviceRecords( String deviceId, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java index 276d66779c..a263e1f9f9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java @@ -84,6 +84,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowsNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode; @@ -169,8 +170,8 @@ public enum PlanNodeType { IDENTITY_SINK((short) 70), SHUFFLE_SINK((short) 71), BATCH_ACTIVATE_TEMPLATE((short) 72), - FAST_INSERT_ROW((short) 73), + FAST_INSERT_ROWS((short) 74), ; public static final int BYTES = Short.BYTES; @@ -366,6 +367,8 @@ public enum PlanNodeType { return BatchActivateTemplateNode.deserialize(buffer); case 73: return FastInsertRowNode.deserialize(buffer); + case 74: + return FastInsertRowsNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java index 21066602a1..f33056eec5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java @@ -23,9 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.plan.analyze.Analysis; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.utils.TimePartitionUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -53,6 +58,46 @@ public class FastInsertRowsNode extends InsertRowsNode { } } + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.FAST_INSERT_ROWS.serialize(stream); + + ReadWriteIOUtils.write(insertRowNodeList.size(), stream); + + for (InsertRowNode node : insertRowNodeList) { + node.subSerialize(stream); + } + for (Integer index : insertRowNodeIndexList) { + ReadWriteIOUtils.write(index, stream); + } + } + + public static FastInsertRowsNode deserialize(ByteBuffer byteBuffer) { + PlanNodeId planNodeId; + List<InsertRowNode> insertRowNodeList = new ArrayList<>(); + List<Integer> insertRowNodeIndex = new ArrayList<>(); + + int size = byteBuffer.getInt(); + for (int i = 0; i < size; i++) { + FastInsertRowNode insertRowNode = new FastInsertRowNode(new PlanNodeId("")); + insertRowNode.subDeserialize(byteBuffer); + insertRowNodeList.add(insertRowNode); + } + for (int i = 0; i < size; i++) { + insertRowNodeIndex.add(byteBuffer.getInt()); + } + + planNodeId = PlanNodeId.deserialize(byteBuffer); + for (InsertRowNode insertRowNode : insertRowNodeList) { + insertRowNode.setPlanNodeId(planNodeId); + } + + FastInsertRowsNode fastInsertRowsNode = new FastInsertRowsNode(planNodeId); + fastInsertRowsNode.setInsertRowNodeList(insertRowNodeList); + fastInsertRowsNode.setInsertRowNodeIndexList(insertRowNodeIndex); + return fastInsertRowsNode; + } + @Override public List<WritePlanNode> splitByPartition(Analysis analysis) { Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java index 3cb42dda31..29762bf537 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java @@ -56,10 +56,10 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode { * InsertRowsNode_2's insertRowNodeList = {InsertRowNode_1, * InsertRowNode_2} then * InsertRowsNode_2's insertRowNodeIndexList= {1, 2} respectively; */ - private List<Integer> insertRowNodeIndexList; + protected List<Integer> insertRowNodeIndexList; /** the InsertRowsNode list */ - private List<InsertRowNode> insertRowNodeList; + protected List<InsertRowNode> insertRowNodeList; public InsertRowsNode(PlanNodeId id) { super(id); diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 369f723f2d..30f1301f7e 100644 --- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -841,6 +841,30 @@ public class SessionPool implements ISessionPool { } } + @Override + public void fastInsertRecords( + List<String> multiSeriesIds, + List<Long> times, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + ISession session = getSession(); + try { + session.fastInsertRecords(multiSeriesIds, times, typesList, valuesList); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("insertAlignedRecords failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + /** * Insert data that belong to the same device in batch format, which can reduce the overhead of * network. This method is just like jdbc batch insert, we pack some insert request in batch and
