This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch cluster_auto_create_schema_improvement
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7852118b2d1a10418efa9b7b59546ccf79fd3356
Author: LebronAl <[email protected]>
AuthorDate: Wed Feb 24 21:08:08 2021 +0800

    finish cluster auto create schema improvement
---
 .../apache/iotdb/cluster/metadata/CMManager.java   | 173 ++++++++++++---------
 .../db/qp/physical/crud/InsertMultiTabletPlan.java |  16 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  28 ++++
 3 files changed, 132 insertions(+), 85 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index c5094c8..7731bf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -53,8 +53,10 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
@@ -426,58 +428,79 @@ public class CMManager extends MManager {
   }
 
   /**
-   * create storage groups for CreateTimeseriesPlan and InsertPlan, also 
create timeseries for
-   * InsertPlan. Only the two kind of plans can use this method.
+   * create storage groups for CreateTimeseriesPlan, CreateMultiTimeseriesPlan 
and InsertPlan, also
+   * create timeseries for InsertPlan. Only the three kind of plans can use 
this method.
    */
   public void createSchema(PhysicalPlan plan) throws MetadataException, 
CheckConsistencyException {
-    // try to set storage group
-    List<PartialPath> deviceIds;
-    // only handle InsertPlan, CreateTimeSeriesPlan and 
CreateMultiTimeSeriesPlan currently,
-    if (plan instanceof InsertPlan
-        && !(plan instanceof InsertMultiTabletPlan)
-        && !(plan instanceof InsertRowsPlan)) {
-      // InsertMultiTabletPlan and InsertRowsPlan have multiple devices, and 
other types of
-      // InsertPlan have only one device.
-      deviceIds = Collections.singletonList(((InsertPlan) plan).getDeviceId());
+    List<PartialPath> storageGroups = new ArrayList<>();
+    // for InsertPlan, try to just use deviceIds to get related storage groups 
because there's no
+    // need to call getPaths to concat deviceId and sensor as they will gain 
same result.
+    // for CreateTimeSeriesPlan, use getPath() to get timeseries to get 
related storage group
+    // for CreateTimeMultiSeriesPlan, use getPaths() to get all timeseries to 
get related storage
+    // groups
+    if (plan instanceof InsertRowPlan
+        || plan instanceof InsertRowsOfOneDevicePlan
+        || plan instanceof InsertTabletPlan) {
+      storageGroups.addAll(
+          getStorageGroups(Collections.singletonList(((InsertPlan) 
plan).getDeviceId())));
+    } else if (plan instanceof InsertRowsPlan) {
+      storageGroups.addAll(
+          getStorageGroups(
+              ((InsertRowsPlan) plan)
+                  .getInsertRowPlanList().stream()
+                      .map(InsertPlan::getDeviceId)
+                      .collect(Collectors.toList())));
+    } else if (plan instanceof InsertMultiTabletPlan) {
+      storageGroups.addAll(
+          getStorageGroups(
+              ((InsertMultiTabletPlan) plan)
+                  .getInsertTabletPlanList().stream()
+                      .map(InsertPlan::getDeviceId)
+                      .collect(Collectors.toList())));
     } else if (plan instanceof CreateTimeSeriesPlan) {
-      deviceIds = Collections.singletonList(((CreateTimeSeriesPlan) 
plan).getPath());
+      storageGroups.addAll(
+          getStorageGroups(Collections.singletonList(((CreateTimeSeriesPlan) 
plan).getPath())));
     } else {
-      deviceIds = 
plan.getPaths().stream().distinct().collect(Collectors.toList());
+      storageGroups.addAll(getStorageGroups(plan.getPaths()));
     }
 
-    for (PartialPath deviceId : deviceIds) {
-      createStorageGroup(deviceId);
-    }
+    // create storage groups
+    createStorageGroups(storageGroups);
 
     // need to verify the storage group is created
-    verifyCreatedSgSuccess(deviceIds, plan);
+    verifyCreatedSgSuccess(storageGroups, plan);
 
     if (plan instanceof InsertPlan) {
       // try to create timeseries
-      boolean isAutoCreateTimeseriesSuccess = createTimeseries((InsertPlan) 
plan);
-      if (!isAutoCreateTimeseriesSuccess) {
+      if (!createTimeseries((InsertPlan) plan)) {
         throw new MetadataException("Failed to create timeseries from 
InsertPlan automatically.");
       }
     }
   }
 
+  /** return storage groups paths for given deviceIds or timeseries. */
+  private List<PartialPath> getStorageGroups(List<PartialPath> paths) throws 
MetadataException {
+    Set<PartialPath> storageGroups = new HashSet<>();
+    for (PartialPath path : paths) {
+      storageGroups.add(
+          MetaUtils.getStorageGroupPathByLevel(
+              path, 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel()));
+    }
+    return new ArrayList<>(storageGroups);
+  }
+
   @SuppressWarnings("squid:S3776")
-  private void verifyCreatedSgSuccess(List<PartialPath> deviceIds, 
PhysicalPlan physicalPlan)
-      throws MetadataException {
+  private void verifyCreatedSgSuccess(List<PartialPath> storageGroups, 
PhysicalPlan physicalPlan) {
     long startTime = System.currentTimeMillis();
-    boolean[] ready = new boolean[deviceIds.size()];
+    boolean[] ready = new boolean[storageGroups.size()];
     Arrays.fill(ready, false);
     while (true) {
       boolean allReady = true;
-      for (int i = 0; i < deviceIds.size(); i++) {
+      for (int i = 0; i < storageGroups.size(); i++) {
         if (ready[i]) {
           continue;
         }
-        PartialPath storageGroupName =
-            MetaUtils.getStorageGroupPathByLevel(
-                deviceIds.get(i),
-                
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel());
-        if (IoTDB.metaManager.isStorageGroup(storageGroupName)) {
+        if (IoTDB.metaManager.isStorageGroup(storageGroups.get(i))) {
           ready[i] = true;
         } else {
           allReady = false;
@@ -500,24 +523,23 @@ public class CMManager extends MManager {
   }
 
   /**
-   * Create storage group automatically for deviceId.
+   * Create storage groups automatically for paths.
    *
-   * @param deviceId
+   * @param storageGroups
    */
-  private void createStorageGroup(PartialPath deviceId) throws 
MetadataException {
-    PartialPath storageGroupName =
-        MetaUtils.getStorageGroupPathByLevel(
-            deviceId, 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel());
-    SetStorageGroupPlan setStorageGroupPlan = new 
SetStorageGroupPlan(storageGroupName);
-    TSStatus setStorageGroupResult =
-        metaGroupMember.processNonPartitionedMetaPlan(setStorageGroupPlan);
-    if (setStorageGroupResult.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && setStorageGroupResult.getCode()
-            != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
-      throw new MetadataException(
-          String.format(
-              "Status Code: %d, failed to set storage group %s",
-              setStorageGroupResult.getCode(), storageGroupName));
+  private void createStorageGroups(List<PartialPath> storageGroups) throws 
MetadataException {
+    for (PartialPath storageGroup : storageGroups) {
+      SetStorageGroupPlan setStorageGroupPlan = new 
SetStorageGroupPlan(storageGroup);
+      TSStatus setStorageGroupResult =
+          metaGroupMember.processNonPartitionedMetaPlan(setStorageGroupPlan);
+      if (setStorageGroupResult.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && setStorageGroupResult.getCode()
+              != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+        throw new MetadataException(
+            String.format(
+                "Status Code: %d, failed to set storage group %s",
+                setStorageGroupResult.getCode(), storageGroup));
+      }
     }
   }
 
@@ -604,7 +626,12 @@ public class CMManager extends MManager {
   private boolean createTimeseries(
       List<String> unregisteredSeriesList, List<String> seriesList, InsertPlan 
insertPlan)
       throws IllegalPathException {
+    List<PartialPath> paths = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<TSEncoding> encodings = new ArrayList<>();
+    List<CompressionType> compressionTypes = new ArrayList<>();
     for (String seriesPath : unregisteredSeriesList) {
+      paths.add(new PartialPath(seriesPath));
       int index = seriesList.indexOf(seriesPath);
       TSDataType dataType;
       // use data types in insertPlan if provided, otherwise infer them from 
the values
@@ -618,40 +645,32 @@ public class CMManager extends MManager {
                     : ((InsertRowPlan) insertPlan).getValues()[index],
                 true);
       }
+      dataTypes.add(dataType);
       // use default encoding and compression from the config
-      TSEncoding encoding = getDefaultEncoding(dataType);
-      CompressionType compressionType = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
-      CreateTimeSeriesPlan createTimeSeriesPlan =
-          new CreateTimeSeriesPlan(
-              new PartialPath(seriesPath),
-              dataType,
-              encoding,
-              compressionType,
-              null,
-              null,
-              null,
-              null);
-      // TODO-Cluster: add executeNonQueryBatch() to create the series in batch
-      TSStatus result;
-      try {
-        result = coordinator.processPartitionedPlan(createTimeSeriesPlan);
-      } catch (UnsupportedPlanException e) {
-        logger.error(
-            "Failed to create timeseries {} automatically. Unsupported plan 
exception {} ",
-            seriesPath,
-            e.getMessage());
-        return false;
-      }
-      if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && result.getCode() != 
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()
-          && result.getCode() != 
TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
-        logger.error(
-            "{} failed to execute create timeseries {}: {}",
-            metaGroupMember.getThisNode(),
-            seriesPath,
-            result);
-        return false;
-      }
+      encodings.add(getDefaultEncoding(dataType));
+      
compressionTypes.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
+    }
+    CreateMultiTimeSeriesPlan plan =
+        new CreateMultiTimeSeriesPlan(paths, dataTypes, encodings, 
compressionTypes);
+    TSStatus result;
+    try {
+      result = coordinator.processPartitionedPlan(plan);
+    } catch (UnsupportedPlanException e) {
+      logger.error(
+          "Failed to create timeseries {} automatically. Unsupported plan 
exception {} ",
+          paths,
+          e.getMessage());
+      return false;
+    }
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && result.getCode() != 
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()
+        && result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+      logger.error(
+          "{} failed to execute create timeseries {}: {}",
+          metaGroupMember.getThisNode(),
+          paths,
+          result);
+      return false;
     }
     return true;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index f3c7cfc..6f73a74 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -114,14 +114,6 @@ public class InsertMultiTabletPlan extends InsertPlan {
     parentInsertTabletPlanIndexList.add(parentIndex);
   }
 
-  public List<InsertTabletPlan> getInsertTabletPlanList() {
-    return insertTabletPlanList;
-  }
-
-  public List<Integer> getParentInsertTabletPlanIndexList() {
-    return parentInsertTabletPlanIndexList;
-  }
-
   @Override
   public List<PartialPath> getPaths() {
     List<PartialPath> result = new ArrayList<>();
@@ -216,10 +208,18 @@ public class InsertMultiTabletPlan extends InsertPlan {
     this.parentInsertTabletPlanIndexList = parentInsertTabletPlanIndexList;
   }
 
+  public List<Integer> getParentInsertTabletPlanIndexList() {
+    return parentInsertTabletPlanIndexList;
+  }
+
   public void setInsertTabletPlanList(List<InsertTabletPlan> 
insertTabletPlanList) {
     this.insertTabletPlanList = insertTabletPlanList;
   }
 
+  public List<InsertTabletPlan> getInsertTabletPlanList() {
+    return insertTabletPlanList;
+  }
+
   public void setResults(Map<Integer, TSStatus> results) {
     this.results = results;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index f3f354d..232d378 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -62,6 +62,34 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
     super(false, Operator.OperatorType.CREATE_MULTI_TIMESERIES);
   }
 
+  public CreateMultiTimeSeriesPlan(
+      List<PartialPath> paths,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors) {
+    this(paths, dataTypes, encodings, compressors, null, null, null, null);
+  }
+
+  public CreateMultiTimeSeriesPlan(
+      List<PartialPath> paths,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      List<String> alias,
+      List<Map<String, String>> props,
+      List<Map<String, String>> tags,
+      List<Map<String, String>> attributes) {
+    super(false, Operator.OperatorType.CREATE_MULTI_TIMESERIES);
+    this.paths = paths;
+    this.dataTypes = dataTypes;
+    this.encodings = encodings;
+    this.compressors = compressors;
+    this.alias = alias;
+    this.props = props;
+    this.tags = tags;
+    this.attributes = attributes;
+  }
+
   @Override
   public List<PartialPath> getPaths() {
     return paths;

Reply via email to