This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch cluster_auto_create_schema_improvement in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7852118b2d1a10418efa9b7b59546ccf79fd3356 Author: LebronAl <[email protected]> AuthorDate: Wed Feb 24 21:08:08 2021 +0800 finish cluster auto create schema improvement --- .../apache/iotdb/cluster/metadata/CMManager.java | 173 ++++++++++++--------- .../db/qp/physical/crud/InsertMultiTabletPlan.java | 16 +- .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 28 ++++ 3 files changed, 132 insertions(+), 85 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index c5094c8..7731bf5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -53,8 +53,10 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; @@ -426,58 +428,79 @@ public class CMManager extends MManager { } /** - * create storage groups for CreateTimeseriesPlan and InsertPlan, also create timeseries for - * InsertPlan. Only the two kind of plans can use this method. + * create storage groups for CreateTimeseriesPlan, CreateMultiTimeseriesPlan and InsertPlan, also + * create timeseries for InsertPlan. Only the three kind of plans can use this method. */ public void createSchema(PhysicalPlan plan) throws MetadataException, CheckConsistencyException { - // try to set storage group - List<PartialPath> deviceIds; - // only handle InsertPlan, CreateTimeSeriesPlan and CreateMultiTimeSeriesPlan currently, - if (plan instanceof InsertPlan - && !(plan instanceof InsertMultiTabletPlan) - && !(plan instanceof InsertRowsPlan)) { - // InsertMultiTabletPlan and InsertRowsPlan have multiple devices, and other types of - // InsertPlan have only one device. - deviceIds = Collections.singletonList(((InsertPlan) plan).getDeviceId()); + List<PartialPath> storageGroups = new ArrayList<>(); + // for InsertPlan, try to just use deviceIds to get related storage groups because there's no + // need to call getPaths to concat deviceId and sensor as they will gain same result. + // for CreateTimeSeriesPlan, use getPath() to get timeseries to get related storage group + // for CreateTimeMultiSeriesPlan, use getPaths() to get all timeseries to get related storage + // groups + if (plan instanceof InsertRowPlan + || plan instanceof InsertRowsOfOneDevicePlan + || plan instanceof InsertTabletPlan) { + storageGroups.addAll( + getStorageGroups(Collections.singletonList(((InsertPlan) plan).getDeviceId()))); + } else if (plan instanceof InsertRowsPlan) { + storageGroups.addAll( + getStorageGroups( + ((InsertRowsPlan) plan) + .getInsertRowPlanList().stream() + .map(InsertPlan::getDeviceId) + .collect(Collectors.toList()))); + } else if (plan instanceof InsertMultiTabletPlan) { + storageGroups.addAll( + getStorageGroups( + ((InsertMultiTabletPlan) plan) + .getInsertTabletPlanList().stream() + .map(InsertPlan::getDeviceId) + .collect(Collectors.toList()))); } else if (plan instanceof CreateTimeSeriesPlan) { - deviceIds = Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath()); + storageGroups.addAll( + getStorageGroups(Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath()))); } else { - deviceIds = plan.getPaths().stream().distinct().collect(Collectors.toList()); + storageGroups.addAll(getStorageGroups(plan.getPaths())); } - for (PartialPath deviceId : deviceIds) { - createStorageGroup(deviceId); - } + // create storage groups + createStorageGroups(storageGroups); // need to verify the storage group is created - verifyCreatedSgSuccess(deviceIds, plan); + verifyCreatedSgSuccess(storageGroups, plan); if (plan instanceof InsertPlan) { // try to create timeseries - boolean isAutoCreateTimeseriesSuccess = createTimeseries((InsertPlan) plan); - if (!isAutoCreateTimeseriesSuccess) { + if (!createTimeseries((InsertPlan) plan)) { throw new MetadataException("Failed to create timeseries from InsertPlan automatically."); } } } + /** return storage groups paths for given deviceIds or timeseries. */ + private List<PartialPath> getStorageGroups(List<PartialPath> paths) throws MetadataException { + Set<PartialPath> storageGroups = new HashSet<>(); + for (PartialPath path : paths) { + storageGroups.add( + MetaUtils.getStorageGroupPathByLevel( + path, IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel())); + } + return new ArrayList<>(storageGroups); + } + @SuppressWarnings("squid:S3776") - private void verifyCreatedSgSuccess(List<PartialPath> deviceIds, PhysicalPlan physicalPlan) - throws MetadataException { + private void verifyCreatedSgSuccess(List<PartialPath> storageGroups, PhysicalPlan physicalPlan) { long startTime = System.currentTimeMillis(); - boolean[] ready = new boolean[deviceIds.size()]; + boolean[] ready = new boolean[storageGroups.size()]; Arrays.fill(ready, false); while (true) { boolean allReady = true; - for (int i = 0; i < deviceIds.size(); i++) { + for (int i = 0; i < storageGroups.size(); i++) { if (ready[i]) { continue; } - PartialPath storageGroupName = - MetaUtils.getStorageGroupPathByLevel( - deviceIds.get(i), - IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel()); - if (IoTDB.metaManager.isStorageGroup(storageGroupName)) { + if (IoTDB.metaManager.isStorageGroup(storageGroups.get(i))) { ready[i] = true; } else { allReady = false; @@ -500,24 +523,23 @@ public class CMManager extends MManager { } /** - * Create storage group automatically for deviceId. + * Create storage groups automatically for paths. * - * @param deviceId + * @param storageGroups */ - private void createStorageGroup(PartialPath deviceId) throws MetadataException { - PartialPath storageGroupName = - MetaUtils.getStorageGroupPathByLevel( - deviceId, IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel()); - SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(storageGroupName); - TSStatus setStorageGroupResult = - metaGroupMember.processNonPartitionedMetaPlan(setStorageGroupPlan); - if (setStorageGroupResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && setStorageGroupResult.getCode() - != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) { - throw new MetadataException( - String.format( - "Status Code: %d, failed to set storage group %s", - setStorageGroupResult.getCode(), storageGroupName)); + private void createStorageGroups(List<PartialPath> storageGroups) throws MetadataException { + for (PartialPath storageGroup : storageGroups) { + SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(storageGroup); + TSStatus setStorageGroupResult = + metaGroupMember.processNonPartitionedMetaPlan(setStorageGroupPlan); + if (setStorageGroupResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && setStorageGroupResult.getCode() + != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) { + throw new MetadataException( + String.format( + "Status Code: %d, failed to set storage group %s", + setStorageGroupResult.getCode(), storageGroup)); + } } } @@ -604,7 +626,12 @@ public class CMManager extends MManager { private boolean createTimeseries( List<String> unregisteredSeriesList, List<String> seriesList, InsertPlan insertPlan) throws IllegalPathException { + List<PartialPath> paths = new ArrayList<>(); + List<TSDataType> dataTypes = new ArrayList<>(); + List<TSEncoding> encodings = new ArrayList<>(); + List<CompressionType> compressionTypes = new ArrayList<>(); for (String seriesPath : unregisteredSeriesList) { + paths.add(new PartialPath(seriesPath)); int index = seriesList.indexOf(seriesPath); TSDataType dataType; // use data types in insertPlan if provided, otherwise infer them from the values @@ -618,40 +645,32 @@ public class CMManager extends MManager { : ((InsertRowPlan) insertPlan).getValues()[index], true); } + dataTypes.add(dataType); // use default encoding and compression from the config - TSEncoding encoding = getDefaultEncoding(dataType); - CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor(); - CreateTimeSeriesPlan createTimeSeriesPlan = - new CreateTimeSeriesPlan( - new PartialPath(seriesPath), - dataType, - encoding, - compressionType, - null, - null, - null, - null); - // TODO-Cluster: add executeNonQueryBatch() to create the series in batch - TSStatus result; - try { - result = coordinator.processPartitionedPlan(createTimeSeriesPlan); - } catch (UnsupportedPlanException e) { - logger.error( - "Failed to create timeseries {} automatically. Unsupported plan exception {} ", - seriesPath, - e.getMessage()); - return false; - } - if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode() - && result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) { - logger.error( - "{} failed to execute create timeseries {}: {}", - metaGroupMember.getThisNode(), - seriesPath, - result); - return false; - } + encodings.add(getDefaultEncoding(dataType)); + compressionTypes.add(TSFileDescriptor.getInstance().getConfig().getCompressor()); + } + CreateMultiTimeSeriesPlan plan = + new CreateMultiTimeSeriesPlan(paths, dataTypes, encodings, compressionTypes); + TSStatus result; + try { + result = coordinator.processPartitionedPlan(plan); + } catch (UnsupportedPlanException e) { + logger.error( + "Failed to create timeseries {} automatically. Unsupported plan exception {} ", + paths, + e.getMessage()); + return false; + } + if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode() + && result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) { + logger.error( + "{} failed to execute create timeseries {}: {}", + metaGroupMember.getThisNode(), + paths, + result); + return false; } return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java index f3c7cfc..6f73a74 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java @@ -114,14 +114,6 @@ public class InsertMultiTabletPlan extends InsertPlan { parentInsertTabletPlanIndexList.add(parentIndex); } - public List<InsertTabletPlan> getInsertTabletPlanList() { - return insertTabletPlanList; - } - - public List<Integer> getParentInsertTabletPlanIndexList() { - return parentInsertTabletPlanIndexList; - } - @Override public List<PartialPath> getPaths() { List<PartialPath> result = new ArrayList<>(); @@ -216,10 +208,18 @@ public class InsertMultiTabletPlan extends InsertPlan { this.parentInsertTabletPlanIndexList = parentInsertTabletPlanIndexList; } + public List<Integer> getParentInsertTabletPlanIndexList() { + return parentInsertTabletPlanIndexList; + } + public void setInsertTabletPlanList(List<InsertTabletPlan> insertTabletPlanList) { this.insertTabletPlanList = insertTabletPlanList; } + public List<InsertTabletPlan> getInsertTabletPlanList() { + return insertTabletPlanList; + } + public void setResults(Map<Integer, TSStatus> results) { this.results = results; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java index f3f354d..232d378 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java @@ -62,6 +62,34 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan { super(false, Operator.OperatorType.CREATE_MULTI_TIMESERIES); } + public CreateMultiTimeSeriesPlan( + List<PartialPath> paths, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors) { + this(paths, dataTypes, encodings, compressors, null, null, null, null); + } + + public CreateMultiTimeSeriesPlan( + List<PartialPath> paths, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors, + List<String> alias, + List<Map<String, String>> props, + List<Map<String, String>> tags, + List<Map<String, String>> attributes) { + super(false, Operator.OperatorType.CREATE_MULTI_TIMESERIES); + this.paths = paths; + this.dataTypes = dataTypes; + this.encodings = encodings; + this.compressors = compressors; + this.alias = alias; + this.props = props; + this.tags = tags; + this.attributes = attributes; + } + @Override public List<PartialPath> getPaths() { return paths;
