This is an automated email from the ASF dual-hosted git repository. neuyilan pushed a commit to branch optimize_insert_records in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 270d9358a1654f1e7b31a6f2fedf9ca689450e0a Author: HouliangQi <[email protected]> AuthorDate: Fri Feb 19 20:09:13 2021 +0800 optimize the insertRecords session interface for cluster version --- .../iotdb/cluster/coordinator/Coordinator.java | 31 ++- .../cluster/log/applier/AsyncDataLogApplier.java | 8 + .../iotdb/cluster/log/applier/DataLogApplier.java | 11 ++ .../apache/iotdb/cluster/metadata/CMManager.java | 34 +++- .../iotdb/cluster/query/ClusterPlanRouter.java | 35 +++- .../apache/iotdb/db/qp/executor/IPlanExecutor.java | 8 + .../apache/iotdb/db/qp/executor/PlanExecutor.java | 14 ++ .../org/apache/iotdb/db/qp/logical/Operator.java | 3 +- .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 8 +- .../iotdb/db/qp/physical/crud/InsertRowPlan.java | 10 +- .../physical/crud/InsertRowsOfOneDevicePlan.java | 4 +- .../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 207 +++++++++++++++++++++ .../org/apache/iotdb/db/service/TSServiceImpl.java | 55 +++--- 13 files changed, 383 insertions(+), 45 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java index e284420..4fcbdd9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; @@ -307,8 +308,10 @@ public class Coordinator { } else { if (plan instanceof InsertTabletPlan || plan instanceof InsertMultiTabletPlan - || plan instanceof CreateMultiTimeSeriesPlan) { - // InsertTabletPlan, InsertMultiTabletPlan and CreateMultiTimeSeriesPlan contains many rows, + || plan instanceof CreateMultiTimeSeriesPlan + || plan instanceof InsertRowsPlan) { + // InsertTabletPlan, InsertMultiTabletPlan, InsertRowsPlan and CreateMultiTimeSeriesPlan + // contains many rows, // each will correspond to a TSStatus as its execution result, // as the plan is split and the sub-plans may have interleaving ranges, // we must assure that each TSStatus is placed to the right position @@ -458,7 +461,10 @@ public class Coordinator { totalRowNum = ((InsertMultiTabletPlan) parentPlan).getTabletsSize(); } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) { totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size(); + } else if (parentPlan instanceof InsertRowsPlan) { + totalRowNum = ((InsertRowsPlan) parentPlan).getRowCount(); } + if (subStatus == null) { subStatus = new TSStatus[totalRowNum]; Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); @@ -494,8 +500,14 @@ public class Coordinator { for (int i = 0; i < subPlan.getIndexes().size(); i++) { subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i); } + } else if (parentPlan instanceof InsertRowsPlan) { + InsertRowsPlan subPlan = (InsertRowsPlan) entry.getKey(); + for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) { + subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = tmpStatus.subStatus.get(i); + } } } + if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // execution failed, record the error message errorCodePartitionGroups.add( @@ -551,6 +563,21 @@ public class Coordinator { subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue(); } } + + if (parentPlan instanceof InsertRowsPlan + && !((InsertRowsPlan) parentPlan).getResults().isEmpty()) { + if (subStatus == null) { + subStatus = new TSStatus[totalRowNum]; + Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); + } + noFailure = false; + isBatchFailure = true; + for (Map.Entry<Integer, TSStatus> integerTSStatusEntry : + ((InsertRowsPlan) parentPlan).getResults().entrySet()) { + subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue(); + } + } + return concludeFinalStatus( noFailure, endPoint, isBatchFailure, subStatus, errorCodePartitionGroups); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java index 8c4c2eb..65a5b2d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.service.IoTDB; @@ -142,6 +143,10 @@ public class AsyncDataLogApplier implements LogApplier { * same. this is done in {@link * org.apache.iotdb.cluster.query.ClusterPlanRouter#splitAndRoutePlan(InsertMultiTabletPlan)} * + * <p>We can also sure that the storage group of all InsertRowPlans in InsertRowsPlan are the + * same. this is done in {@link + * org.apache.iotdb.cluster.query.ClusterPlanRouter#splitAndRoutePlan(InsertRowsPlan)} + * * @return the sg that the plan belongs to * @throws StorageGroupNotSetException if no sg found */ @@ -150,6 +155,9 @@ public class AsyncDataLogApplier implements LogApplier { if (plan instanceof InsertMultiTabletPlan) { PartialPath deviceId = ((InsertMultiTabletPlan) plan).getFirstDeviceId(); sgPath = IoTDB.metaManager.getStorageGroupPath(deviceId); + } else if (plan instanceof InsertRowsPlan) { + PartialPath path = ((InsertRowsPlan) plan).getFirstDeviceId(); + sgPath = IoTDB.metaManager.getStorageGroupPath(path); } else if (plan instanceof InsertPlan) { PartialPath deviceId = ((InsertPlan) plan).getDeviceId(); sgPath = IoTDB.metaManager.getStorageGroupPath(deviceId); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java index 5eecc7d..7d11d3e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java @@ -37,6 +37,8 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.service.IoTDB; @@ -67,6 +69,8 @@ public class DataLogApplier extends BaseApplier { PhysicalPlan plan = physicalPlanLog.getPlan(); if (plan instanceof InsertMultiTabletPlan) { applyInsert((InsertMultiTabletPlan) plan); + } else if (plan instanceof InsertRowsPlan) { + applyInsert((InsertRowsPlan) plan); } else if (plan instanceof InsertPlan) { applyInsert((InsertPlan) plan); } else { @@ -101,6 +105,13 @@ public class DataLogApplier extends BaseApplier { } } + private void applyInsert(InsertRowsPlan plan) + throws StorageGroupNotSetException, QueryProcessException, StorageEngineException { + for (InsertRowPlan insertRowPlan : plan.getInsertRowPlanList()) { + applyInsert(insertRowPlan); + } + } + private void applyInsert(InsertPlan plan) throws StorageGroupNotSetException, QueryProcessException, StorageEngineException { // check if the corresponding slot is being pulled diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index 7ef9640..e597af3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -53,6 +53,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; @@ -406,8 +407,12 @@ public class CMManager extends MManager { public void createSchema(PhysicalPlan plan) throws MetadataException, CheckConsistencyException { // try to set storage group List<PartialPath> deviceIds; - // only handle InsertPlan, CreateTimeSeriesPlan and CreateMultiTimeSeriesPlan currently - if (plan instanceof InsertPlan && !(plan instanceof InsertMultiTabletPlan)) { + // only handle InsertPlan, CreateTimeSeriesPlan and CreateMultiTimeSeriesPlan currently, + if (plan instanceof InsertPlan + && !(plan instanceof InsertMultiTabletPlan) + && !(plan instanceof InsertRowsPlan)) { + // InsertMultiTabletPlan and InsertRowsPlan have multiple devices, and other types of + // InsertPlan have only one device. deviceIds = Collections.singletonList(((InsertPlan) plan).getDeviceId()); } else if (plan instanceof CreateTimeSeriesPlan) { deviceIds = Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath()); @@ -503,7 +508,26 @@ public class CMManager extends MManager { boolean success = createTimeseries(insertTabletPlan); allSuccess = allSuccess && success; if (!success) { - logger.error("create timeseries for device={} failed", insertTabletPlan.getDeviceId()); + logger.error( + "create timeseries for device={} failed, plan={}", + insertTabletPlan.getDeviceId(), + insertTabletPlan); + } + } + return allSuccess; + } + + public boolean createTimeseries(InsertRowsPlan insertRowsPlan) + throws CheckConsistencyException, IllegalPathException { + boolean allSuccess = true; + for (InsertRowPlan insertRowPlan : insertRowsPlan.getInsertRowPlanList()) { + boolean success = createTimeseries(insertRowPlan); + allSuccess = allSuccess && success; + if (!success) { + logger.error( + "create timeseries for device={} failed, plan={}", + insertRowPlan.getDeviceId(), + insertRowPlan); } } return allSuccess; @@ -521,6 +545,10 @@ public class CMManager extends MManager { return createTimeseries((InsertMultiTabletPlan) insertPlan); } + if (insertPlan instanceof InsertRowsPlan) { + return createTimeseries((InsertRowsPlan) insertPlan); + } + List<String> seriesList = new ArrayList<>(); PartialPath deviceId = insertPlan.getDeviceId(); PartialPath storageGroupName; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java index c573449..acb0b77 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CountPlan; @@ -52,6 +53,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; public class ClusterPlanRouter { @@ -110,7 +112,9 @@ public class ClusterPlanRouter { public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan) throws UnsupportedPlanException, MetadataException { - if (plan instanceof InsertTabletPlan) { + if (plan instanceof InsertRowsPlan) { + return splitAndRoutePlan((InsertRowsPlan) plan); + } else if (plan instanceof InsertTabletPlan) { return splitAndRoutePlan((InsertTabletPlan) plan); } else if (plan instanceof InsertMultiTabletPlan) { return splitAndRoutePlan((InsertMultiTabletPlan) plan); @@ -227,6 +231,35 @@ public class ClusterPlanRouter { return result; } + /** + * @param insertRowsPlan InsertRowsPlan + * @return key is InsertRowsPlan, value is the partition group the plan belongs to, all + * InsertRowPlans in InsertRowsPlan belongs to one same storage group. + */ + private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowsPlan insertRowsPlan) + throws MetadataException { + Map<PhysicalPlan, PartitionGroup> result = new HashMap<>(); + Map<PartitionGroup, InsertRowsPlan> groupPlanMap = new HashMap<>(); + for (int i = 0; i < insertRowsPlan.getInsertRowPlanList().size(); i++) { + InsertRowPlan rowPlan = insertRowsPlan.getInsertRowPlanList().get(i); + PartialPath storageGroup = getMManager().getStorageGroupPath(rowPlan.getDeviceId()); + PartitionGroup group = partitionTable.route(storageGroup.getFullPath(), rowPlan.getTime()); + if (groupPlanMap.containsKey(group)) { + InsertRowsPlan tmpPlan = groupPlanMap.get(group); + tmpPlan.addOneInsertRowPlan(rowPlan, i); + } else { + InsertRowsPlan tmpPlan = new InsertRowsPlan(); + tmpPlan.addOneInsertRowPlan(rowPlan, i); + groupPlanMap.put(group, tmpPlan); + } + } + + for (Entry<PartitionGroup, InsertRowsPlan> entry : groupPlanMap.entrySet()) { + result.put(entry.getValue(), entry.getKey()); + } + return result; + } + @SuppressWarnings("SuspiciousSystemArraycopy") private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertTabletPlan plan) throws MetadataException { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java index 76defce..af24983 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; @@ -99,6 +100,13 @@ public interface IPlanExecutor { /** * execute insert command and return whether the operator is successful. * + * @param insertRowsPlan physical insert rows plan, which contains multi insertRowPlans + */ + void insert(InsertRowsPlan insertRowsPlan) throws QueryProcessException; + + /** + * execute insert command and return whether the operator is successful. + * * @param insertRowsOfOneDevicePlan physical insert plan */ void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index c35ae48..02ba2d7 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -67,6 +67,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan; @@ -223,6 +224,9 @@ public class PlanExecutor implements IPlanExecutor { case BATCH_INSERT_ONE_DEVICE: insert((InsertRowsOfOneDevicePlan) plan); return true; + case BATCH_INSERT_ROWS: + insert((InsertRowsPlan) plan); + return true; case BATCHINSERT: insertTablet((InsertTabletPlan) plan); return true; @@ -1098,6 +1102,16 @@ public class PlanExecutor implements IPlanExecutor { } @Override + public void insert(InsertRowsPlan plan) throws QueryProcessException { + for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) { + if (plan.getResults().containsKey(i)) { + continue; + } + insert(plan.getInsertRowPlanList().get(i)); + } + } + + @Override public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException { try { insertRowPlan.setMeasurementMNodes( diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java index 3be1099..d1e9971 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java @@ -145,6 +145,7 @@ public abstract class Operator { MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, BATCH_INSERT_ONE_DEVICE, - MULTI_BATCH_INSERT; + MULTI_BATCH_INSERT, + BATCH_INSERT_ROWS } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java index 29888ea..b329812 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; @@ -348,6 +349,10 @@ public abstract class PhysicalPlan { plan = new StorageGroupMNodePlan(); plan.deserialize(buffer); break; + case BATCH_INSERT_ROWS: + plan = new InsertRowsPlan(); + plan.deserialize(buffer); + break; default: throw new IOException("unrecognized log type " + type); } @@ -390,7 +395,8 @@ public abstract class PhysicalPlan { MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, BATCH_INSERT_ONE_DEVICE, - MULTI_BATCH_INSERT + MULTI_BATCH_INSERT, + BATCH_INSERT_ROWS } public long getIndex() { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java index 58b80c8..8399527 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java @@ -290,8 +290,11 @@ public class InsertRowPlan extends InsertPlan { public void serialize(DataOutputStream stream) throws IOException { int type = PhysicalPlanType.INSERT.ordinal(); stream.writeByte((byte) type); - stream.writeLong(time); + subSerialize(stream); + } + public void subSerialize(DataOutputStream stream) throws IOException { + stream.writeLong(time); putString(stream, deviceId.getFullPath()); serializeMeasurementsAndValues(stream); } @@ -435,8 +438,11 @@ public class InsertRowPlan extends InsertPlan { public void serialize(ByteBuffer buffer) { int type = PhysicalPlanType.INSERT.ordinal(); buffer.put((byte) type); - buffer.putLong(time); + subSerialize(buffer); + } + public void subSerialize(ByteBuffer buffer) { + buffer.putLong(time); putString(buffer, deviceId.getFullPath()); serializeMeasurementsAndValues(buffer); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java index e3211f4..e13dc83 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java @@ -108,11 +108,10 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan { @Override public void serialize(ByteBuffer buffer) { - int type = PhysicalPlanType.INSERT.ordinal(); + int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal(); buffer.put((byte) type); putString(buffer, deviceId.getFullPath()); - buffer.putInt(rowPlans.length); for (InsertRowPlan plan : rowPlans) { buffer.putLong(plan.getTime()); @@ -122,7 +121,6 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan { @Override public void deserialize(ByteBuffer buffer) throws IllegalPathException { - this.deviceId = new PartialPath(readString(buffer)); this.rowPlans = new InsertRowPlan[buffer.getInt()]; for (int i = 0; i < rowPlans.length; i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java new file mode 100644 index 0000000..5ffae25 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.qp.physical.crud; + +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.logical.Operator.OperatorType; +import org.apache.iotdb.service.rpc.thrift.TSStatus; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class InsertRowsPlan extends InsertPlan { + /** + * Suppose there is an InsertRowsPlan, which contains 5 InsertRowPlans, + * insertRowPlanList={InsertRowPlan_0, InsertRowPlan_1, InsertRowPlan_2, InsertRowPlan_3, + * InsertRowPlan_4}, then the insertRowPlanIndexList={0, 1, 2, 3, 4} respectively. But when the + * InsertRowsPlan is split into two InsertRowsPlans according to different storage group in + * cluster version, suppose that the InsertRowsPlan_1's insertRowPlanList = {InsertRowPlan_0, + * InsertRowPlan_3, InsertRowPlan_4}, then InsertRowsPlan_1's insertRowPlanIndexList = {0, 3, 4}; + * InsertRowsPlan_2's insertRowPlanList = {InsertRowPlan_1, * InsertRowPlan_2} then + * InsertRowsPlan_2's insertRowPlanIndexList= {1, 2} respectively; + */ + private List<Integer> insertRowPlanIndexList; + + /** the InsertRowsPlan list */ + private List<InsertRowPlan> insertRowPlanList; + + /** record the result of insert rows */ + private Map<Integer, TSStatus> results = new HashMap<>(); + + public InsertRowsPlan() { + super(OperatorType.BATCH_INSERT_ROWS); + insertRowPlanList = new ArrayList<>(); + insertRowPlanIndexList = new ArrayList<>(); + } + + @Override + public long getMinTime() { + long minTime = Long.MAX_VALUE; + for (InsertRowPlan insertRowPlan : insertRowPlanList) { + if (insertRowPlan.getMinTime() < minTime) { + minTime = insertRowPlan.getMinTime(); + } + } + return minTime; + } + + @Override + public List<PartialPath> getPaths() { + List<PartialPath> result = new ArrayList<>(); + for (InsertRowPlan insertRowPlan : insertRowPlanList) { + result.addAll(insertRowPlan.getPaths()); + } + return result; + } + + @Override + public void checkIntegrity() throws QueryProcessException { + for (InsertRowPlan insertRowPlan : insertRowPlanList) { + insertRowPlan.checkIntegrity(); + } + } + + @Override + public void recoverFromFailure() { + for (InsertRowPlan insertRowPlan : insertRowPlanList) { + insertRowPlan.recoverFromFailure(); + } + } + + @Override + public InsertPlan getPlanFromFailed() { + if (super.getPlanFromFailed() == null) { + return null; + } + List<InsertRowPlan> plans = new ArrayList<>(); + List<Integer> indexes = new ArrayList<>(); + for (int i = 0; i < insertRowPlanList.size(); i++) { + if (insertRowPlanList.get(i).hasFailedValues()) { + plans.add((InsertRowPlan) insertRowPlanList.get(i).getPlanFromFailed()); + indexes.add(i); + } + } + this.insertRowPlanList = plans; + this.insertRowPlanIndexList = indexes; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + InsertRowsPlan that = (InsertRowsPlan) o; + + if (!Objects.equals(insertRowPlanIndexList, that.insertRowPlanIndexList)) { + return false; + } + if (!Objects.equals(insertRowPlanList, that.insertRowPlanList)) { + return false; + } + return Objects.equals(results, that.results); + } + + @Override + public int hashCode() { + int result = insertRowPlanIndexList != null ? insertRowPlanIndexList.hashCode() : 0; + result = 31 * result + (insertRowPlanList != null ? insertRowPlanList.hashCode() : 0); + result = 31 * result + (results != null ? results.hashCode() : 0); + return result; + } + + @Override + public void serialize(ByteBuffer buffer) { + int type = PhysicalPlanType.BATCH_INSERT_ROWS.ordinal(); + buffer.put((byte) type); + buffer.putInt(insertRowPlanList.size()); + for (InsertRowPlan insertRowPlan : insertRowPlanList) { + insertRowPlan.subSerialize(buffer); + } + for (Integer index : insertRowPlanIndexList) { + buffer.putInt(index); + } + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + int type = PhysicalPlanType.BATCH_INSERT_ROWS.ordinal(); + stream.writeByte((byte) type); + stream.writeInt(insertRowPlanList.size()); + for (InsertRowPlan insertRowPlan : insertRowPlanList) { + insertRowPlan.subSerialize(stream); + } + for (Integer index : insertRowPlanIndexList) { + stream.writeInt(index); + } + } + + @Override + public void deserialize(ByteBuffer buffer) throws IllegalPathException { + int size = buffer.getInt(); + this.insertRowPlanList = new ArrayList<>(size); + this.insertRowPlanIndexList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + InsertRowPlan insertRowPlan = new InsertRowPlan(); + insertRowPlan.deserialize(buffer); + insertRowPlanList.add(insertRowPlan); + } + + for (int i = 0; i < size; i++) { + insertRowPlanIndexList.add(buffer.getInt()); + } + } + + public Map<Integer, TSStatus> getResults() { + return results; + } + + public void addOneInsertRowPlan(InsertRowPlan plan, int index) { + insertRowPlanList.add(plan); + insertRowPlanIndexList.add(index); + } + + public List<Integer> getInsertRowPlanIndexList() { + return insertRowPlanIndexList; + } + + public List<InsertRowPlan> getInsertRowPlanList() { + return insertRowPlanList; + } + + public int getRowCount() { + return insertRowPlanList.size(); + } + + public PartialPath getFirstDeviceId() { + return insertRowPlanList.get(0).getDeviceId(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 5be8485..ca5842e 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; @@ -1147,9 +1148,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { req.getTimestamps().get(0)); } - List<TSStatus> statusList = new ArrayList<>(); - - boolean isAllSuccessful = true; + InsertRowsPlan insertRowsPlan = new InsertRowsPlan(); for (int i = 0; i < req.deviceIds.size(); i++) { try { InsertRowPlan plan = @@ -1159,23 +1158,20 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { req.getMeasurementsList().get(i).toArray(new String[0]), req.valuesList.get(i)); TSStatus status = checkAuthority(plan, req.getSessionId()); - if (status == null) { - status = executeNonQueryPlan(plan); - isAllSuccessful = - ((status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) - && isAllSuccessful); + if (status != null) { + insertRowsPlan.getResults().put(i, status); } - statusList.add(status); + insertRowsPlan.addOneInsertRowPlan(plan, i); } catch (Exception e) { - isAllSuccessful = false; - statusList.add( - onNPEOrUnexpectedException(e, "inserting records", TSStatusCode.INTERNAL_SERVER_ERROR)); + insertRowsPlan + .getResults() + .put( + i, + onNPEOrUnexpectedException( + e, "inserting records", TSStatusCode.INTERNAL_SERVER_ERROR)); } } - - return isAllSuccessful - ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) - : RpcUtils.getStatus(statusList); + return executeNonQueryPlan(insertRowsPlan); } @Override @@ -1234,9 +1230,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { req.getTimestamps().get(0)); } - List<TSStatus> statusList = new ArrayList<>(); InsertRowPlan plan = new InsertRowPlan(); - boolean isAllSuccessful = true; + InsertRowsPlan insertRowsPlan = new InsertRowsPlan(); for (int i = 0; i < req.deviceIds.size(); i++) { try { plan.setDeviceId(new PartialPath(req.getDeviceIds().get(i))); @@ -1246,24 +1241,20 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { plan.setValues(req.getValuesList().get(i).toArray(new Object[0])); plan.setNeedInferType(true); TSStatus status = checkAuthority(plan, req.getSessionId()); - if (status == null) { - status = executeNonQueryPlan(plan); - isAllSuccessful = - ((status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) - && isAllSuccessful); + if (status != null) { + insertRowsPlan.getResults().put(i, status); } - statusList.add(status); + insertRowsPlan.addOneInsertRowPlan(plan, i); } catch (Exception e) { - isAllSuccessful = false; - statusList.add( - onNPEOrUnexpectedException( - e, "inserting string records", TSStatusCode.INTERNAL_SERVER_ERROR)); + insertRowsPlan + .getResults() + .put( + i, + onNPEOrUnexpectedException( + e, "inserting string records", TSStatusCode.INTERNAL_SERVER_ERROR)); } } - - return isAllSuccessful - ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) - : RpcUtils.getStatus(statusList); + return executeNonQueryPlan(insertRowsPlan); } @Override
