This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d2e4d73  [IOTDB-1747] Support for automatically creating schemas for 
aligned time series in cluster module (#4218)
d2e4d73 is described below

commit d2e4d734caa3695d78501e888e32aac0a2cf22f2
Author: Mrquan <[email protected]>
AuthorDate: Sun Oct 24 14:12:07 2021 +0800

    [IOTDB-1747] Support for automatically creating schemas for aligned time 
series in cluster module (#4218)
    
    * auto create schema for aligned timeseries
    
    * auto create schema for aligned timeseries
    
    * add e2e test for aligned timeseries
---
 .../db/engine/storagegroup/TsFileProcessor.java    |   3 +
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |   3 +
 .../test/java/org/apache/iotdb/db/sql/Cases.java   | 171 +++++++++++++++++++++
 3 files changed, 177 insertions(+)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index ec71755..4318a5b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -386,6 +386,9 @@ public class TsFileProcessor {
     for (int i = 0; i < insertTabletPlan.getMeasurementMNodes().length; i++) {
       // for aligned timeseries
       if (insertTabletPlan.isAligned()) {
+        if (insertTabletPlan.getMeasurementMNodes()[i] == null) {
+          continue;
+        }
         VectorMeasurementSchema vectorSchema =
             (VectorMeasurementSchema) 
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
         Object[] columns = new 
Object[vectorSchema.getSubMeasurementsList().size()];
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index bca9209..7e47c98 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -153,6 +153,9 @@ public abstract class InsertPlan extends PhysicalPlan {
    * @return the plan itself, with measurements replaced with the previously 
failed ones.
    */
   public InsertPlan getPlanFromFailed() {
+    if (isAligned && originalPrefixPath != null) {
+      prefixPath = originalPrefixPath;
+    }
     if (failedMeasurements == null) {
       return null;
     }
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java 
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 5f255ce..5ff464c 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -823,6 +824,176 @@ public abstract class Cases {
   }
 
   @Test
+  public void testAutoCreateSchemaForAlignedTimeseries()
+      throws IoTDBConnectionException, StatementExecutionException, 
SQLException {
+    List<String> multiMeasurementComponents = new ArrayList<>();
+    multiMeasurementComponents.add("s1");
+    multiMeasurementComponents.add("s2");
+    multiMeasurementComponents.add("s3");
+
+    List<TSDataType> types = new ArrayList<>();
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT32);
+    types.add(TSDataType.FLOAT);
+
+    List<Object> values = new ArrayList<>();
+    values.add(1L);
+    values.add(2);
+    values.add(3.0f);
+
+    List<Long> times = new ArrayList<>();
+    times.add(1L);
+    times.add(2L);
+    times.add(3L);
+
+    session.setStorageGroup(String.format("root.sg0"));
+
+    for (long time = 1; time <= 3; time++) {
+      session.insertAlignedRecord(
+          "root.sg1.d1.v1", time, multiMeasurementComponents, types, values);
+      session.insertAlignedRecord(
+          "root.sg0.d1.v1", time, multiMeasurementComponents, types, values);
+    }
+    List<String> multiSeriesIds = new ArrayList<>();
+    List<List<String>> multiMeasurementComponentsList = new ArrayList<>();
+    List<List<TSDataType>> typeList = new ArrayList<>();
+    List<List<Object>> valueList = new ArrayList<>();
+
+    for (int i = 2; i <= 4; i++) {
+      multiMeasurementComponentsList.add(multiMeasurementComponents);
+      typeList.add(types);
+      valueList.add(values);
+      multiSeriesIds.add(String.format("root.sg%d.d1.v1", i));
+    }
+    for (long time = 1; time <= 3; time++) {
+      List<Long> tmp_times = new ArrayList<>();
+      tmp_times.add(time);
+      tmp_times.add(time);
+      tmp_times.add(time);
+      session.insertAlignedRecords(
+          multiSeriesIds, tmp_times, multiMeasurementComponentsList, typeList, 
valueList);
+    }
+    multiSeriesIds.clear();
+    multiSeriesIds.add("root.sg0.d2.v1");
+    multiSeriesIds.add("root.sg0.d2.v1");
+    multiSeriesIds.add("root.sg0.d2.v1");
+    session.insertAlignedRecords(
+        multiSeriesIds, times, multiMeasurementComponentsList, typeList, 
valueList);
+
+    session.insertAlignedRecordsOfOneDevice(
+        "root.sg5.d1.v1", times, multiMeasurementComponentsList, typeList, 
valueList);
+    session.insertAlignedRecordsOfOneDevice(
+        "root.sg0.d3.v1", times, multiMeasurementComponentsList, typeList, 
valueList);
+
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(
+        new VectorMeasurementSchema(
+            "vector",
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.INT64, TSDataType.INT32, 
TSDataType.FLOAT}));
+
+    Tablet tablet = new Tablet("root.sg6.d1.v1", schemaList);
+    tablet.setAligned(true);
+
+    for (long row = 1; row <= 3; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, row);
+      tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(0), 
rowIndex, 1L);
+      tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(1), 
rowIndex, 2);
+      tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(2), 
rowIndex, 3.0f);
+    }
+    session.insertTablet(tablet, true);
+    tablet.setPrefixPath("root.sg0.d4.v1");
+    session.insertTablet(tablet, true);
+    tablet.reset();
+
+    List<IMeasurementSchema> schemaList1 = new ArrayList<>();
+    schemaList1.add(
+        new VectorMeasurementSchema(
+            "vector6",
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.INT64, TSDataType.INT32, 
TSDataType.FLOAT}));
+    List<IMeasurementSchema> schemaList2 = new ArrayList<>();
+    schemaList2.add(
+        new VectorMeasurementSchema(
+            "vector7",
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.INT64, TSDataType.INT32, 
TSDataType.FLOAT}));
+    List<IMeasurementSchema> schemaList3 = new ArrayList<>();
+    schemaList3.add(
+        new VectorMeasurementSchema(
+            "vector8",
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.INT64, TSDataType.INT32, 
TSDataType.FLOAT}));
+
+    Tablet tablet1 = new Tablet("root.sg7.d1.v1", schemaList1, 100);
+    Tablet tablet2 = new Tablet("root.sg8.d1.v1", schemaList2, 100);
+    Tablet tablet3 = new Tablet("root.sg9.d1.v1", schemaList3, 100);
+    tablet1.setAligned(true);
+    tablet2.setAligned(true);
+    tablet3.setAligned(true);
+
+    Map<String, Tablet> tabletMap = new HashMap<>();
+    tabletMap.put("root.sg7.d1.v1", tablet1);
+    tabletMap.put("root.sg8.d1.v1", tablet2);
+    tabletMap.put("root.sg9.d1.v1", tablet3);
+
+    // Method 1 to add tablet data
+    for (long row = 1; row <= 3; row++) {
+      int row1 = tablet1.rowSize++;
+      int row2 = tablet2.rowSize++;
+      int row3 = tablet3.rowSize++;
+      tablet1.addTimestamp(row1, row);
+      tablet2.addTimestamp(row2, row);
+      tablet3.addTimestamp(row3, row);
+      for (int i = 0; i < 3; i++) {
+        tablet1.addValue(schemaList1.get(0).getSubMeasurementsList().get(i), 
row1, values.get(i));
+        tablet2.addValue(schemaList2.get(0).getSubMeasurementsList().get(i), 
row2, values.get(i));
+        tablet3.addValue(schemaList3.get(0).getSubMeasurementsList().get(i), 
row3, values.get(i));
+      }
+    }
+    session.insertTablets(tabletMap, true);
+
+    tabletMap.clear();
+    tablet1.setPrefixPath("root.sg0.d5.v1");
+    tablet2.setPrefixPath("root.sg0.d6.v1");
+    tablet3.setPrefixPath("root.sg0.d7.v1");
+    tabletMap.put("root.sg0.d5.v1", tablet1);
+    tabletMap.put("root.sg0.d6.v1", tablet2);
+    tabletMap.put("root.sg0.d7.v1", tablet3);
+    session.insertTablets(tabletMap, true);
+    tablet1.reset();
+    tablet2.reset();
+    tablet3.reset();
+
+    for (Statement readStatement : readStatements) {
+      for (int d = 1; d <= 7; d++) {
+        ResultSet resultSet =
+            readStatement.executeQuery(String.format("SELECT * from 
root.sg0.d%d.v1", d));
+        for (long t = 1; t <= 3; t++) {
+          Assert.assertTrue(resultSet.next());
+          Assert.assertEquals(resultSet.getLong(1), t);
+          Assert.assertEquals(resultSet.getString(2), "1");
+          Assert.assertEquals(resultSet.getString(3), "2");
+          Assert.assertEquals(resultSet.getString(4), "3.0");
+        }
+      }
+
+      for (int sg = 1; sg <= 9; sg++) {
+        ResultSet resultSet =
+            readStatement.executeQuery(String.format("SELECT * from 
root.sg%d.d1.v1", sg));
+        for (long t = 1; t <= 3; t++) {
+          Assert.assertTrue(resultSet.next());
+          Assert.assertEquals(resultSet.getLong(1), t);
+          Assert.assertEquals(resultSet.getString(2), "1");
+          Assert.assertEquals(resultSet.getString(3), "2");
+          Assert.assertEquals(resultSet.getString(4), "3.0");
+        }
+      }
+    }
+  }
+
+  @Test
   public void testInsertTabletWithNullValues()
       throws IoTDBConnectionException, StatementExecutionException, 
SQLException {
     List<IMeasurementSchema> schemaList = new ArrayList<>();

Reply via email to