This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d2e4d73 [IOTDB-1747] Support for automatically creating schemas for
aligned time series in cluster module (#4218)
d2e4d73 is described below
commit d2e4d734caa3695d78501e888e32aac0a2cf22f2
Author: Mrquan <[email protected]>
AuthorDate: Sun Oct 24 14:12:07 2021 +0800
[IOTDB-1747] Support for automatically creating schemas for aligned time
series in cluster module (#4218)
* auto create schema for aligned timeseries
* auto create schema for aligned timeseries
* add e2e test for aligned timeseries
---
.../db/engine/storagegroup/TsFileProcessor.java | 3 +
.../iotdb/db/qp/physical/crud/InsertPlan.java | 3 +
.../test/java/org/apache/iotdb/db/sql/Cases.java | 171 +++++++++++++++++++++
3 files changed, 177 insertions(+)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index ec71755..4318a5b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -386,6 +386,9 @@ public class TsFileProcessor {
for (int i = 0; i < insertTabletPlan.getMeasurementMNodes().length; i++) {
// for aligned timeseries
if (insertTabletPlan.isAligned()) {
+ if (insertTabletPlan.getMeasurementMNodes()[i] == null) {
+ continue;
+ }
VectorMeasurementSchema vectorSchema =
(VectorMeasurementSchema)
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
Object[] columns = new
Object[vectorSchema.getSubMeasurementsList().size()];
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index bca9209..7e47c98 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -153,6 +153,9 @@ public abstract class InsertPlan extends PhysicalPlan {
* @return the plan itself, with measurements replaced with the previously
failed ones.
*/
public InsertPlan getPlanFromFailed() {
+ if (isAligned && originalPrefixPath != null) {
+ prefixPath = originalPrefixPath;
+ }
if (failedMeasurements == null) {
return null;
}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 5f255ce..5ff464c 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
@@ -823,6 +824,176 @@ public abstract class Cases {
}
@Test
+ public void testAutoCreateSchemaForAlignedTimeseries()
+ throws IoTDBConnectionException, StatementExecutionException,
SQLException {
+ List<String> multiMeasurementComponents = new ArrayList<>();
+ multiMeasurementComponents.add("s1");
+ multiMeasurementComponents.add("s2");
+ multiMeasurementComponents.add("s3");
+
+ List<TSDataType> types = new ArrayList<>();
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT32);
+ types.add(TSDataType.FLOAT);
+
+ List<Object> values = new ArrayList<>();
+ values.add(1L);
+ values.add(2);
+ values.add(3.0f);
+
+ List<Long> times = new ArrayList<>();
+ times.add(1L);
+ times.add(2L);
+ times.add(3L);
+
+ session.setStorageGroup(String.format("root.sg0"));
+
+ for (long time = 1; time <= 3; time++) {
+ session.insertAlignedRecord(
+ "root.sg1.d1.v1", time, multiMeasurementComponents, types, values);
+ session.insertAlignedRecord(
+ "root.sg0.d1.v1", time, multiMeasurementComponents, types, values);
+ }
+ List<String> multiSeriesIds = new ArrayList<>();
+ List<List<String>> multiMeasurementComponentsList = new ArrayList<>();
+ List<List<TSDataType>> typeList = new ArrayList<>();
+ List<List<Object>> valueList = new ArrayList<>();
+
+ for (int i = 2; i <= 4; i++) {
+ multiMeasurementComponentsList.add(multiMeasurementComponents);
+ typeList.add(types);
+ valueList.add(values);
+ multiSeriesIds.add(String.format("root.sg%d.d1.v1", i));
+ }
+ for (long time = 1; time <= 3; time++) {
+ List<Long> tmp_times = new ArrayList<>();
+ tmp_times.add(time);
+ tmp_times.add(time);
+ tmp_times.add(time);
+ session.insertAlignedRecords(
+ multiSeriesIds, tmp_times, multiMeasurementComponentsList, typeList,
valueList);
+ }
+ multiSeriesIds.clear();
+ multiSeriesIds.add("root.sg0.d2.v1");
+ multiSeriesIds.add("root.sg0.d2.v1");
+ multiSeriesIds.add("root.sg0.d2.v1");
+ session.insertAlignedRecords(
+ multiSeriesIds, times, multiMeasurementComponentsList, typeList,
valueList);
+
+ session.insertAlignedRecordsOfOneDevice(
+ "root.sg5.d1.v1", times, multiMeasurementComponentsList, typeList,
valueList);
+ session.insertAlignedRecordsOfOneDevice(
+ "root.sg0.d3.v1", times, multiMeasurementComponentsList, typeList,
valueList);
+
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(
+ new VectorMeasurementSchema(
+ "vector",
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.INT64, TSDataType.INT32,
TSDataType.FLOAT}));
+
+ Tablet tablet = new Tablet("root.sg6.d1.v1", schemaList);
+ tablet.setAligned(true);
+
+ for (long row = 1; row <= 3; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, row);
+ tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(0),
rowIndex, 1L);
+ tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(1),
rowIndex, 2);
+ tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(2),
rowIndex, 3.0f);
+ }
+ session.insertTablet(tablet, true);
+ tablet.setPrefixPath("root.sg0.d4.v1");
+ session.insertTablet(tablet, true);
+ tablet.reset();
+
+ List<IMeasurementSchema> schemaList1 = new ArrayList<>();
+ schemaList1.add(
+ new VectorMeasurementSchema(
+ "vector6",
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.INT64, TSDataType.INT32,
TSDataType.FLOAT}));
+ List<IMeasurementSchema> schemaList2 = new ArrayList<>();
+ schemaList2.add(
+ new VectorMeasurementSchema(
+ "vector7",
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.INT64, TSDataType.INT32,
TSDataType.FLOAT}));
+ List<IMeasurementSchema> schemaList3 = new ArrayList<>();
+ schemaList3.add(
+ new VectorMeasurementSchema(
+ "vector8",
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.INT64, TSDataType.INT32,
TSDataType.FLOAT}));
+
+ Tablet tablet1 = new Tablet("root.sg7.d1.v1", schemaList1, 100);
+ Tablet tablet2 = new Tablet("root.sg8.d1.v1", schemaList2, 100);
+ Tablet tablet3 = new Tablet("root.sg9.d1.v1", schemaList3, 100);
+ tablet1.setAligned(true);
+ tablet2.setAligned(true);
+ tablet3.setAligned(true);
+
+ Map<String, Tablet> tabletMap = new HashMap<>();
+ tabletMap.put("root.sg7.d1.v1", tablet1);
+ tabletMap.put("root.sg8.d1.v1", tablet2);
+ tabletMap.put("root.sg9.d1.v1", tablet3);
+
+ // Method 1 to add tablet data
+ for (long row = 1; row <= 3; row++) {
+ int row1 = tablet1.rowSize++;
+ int row2 = tablet2.rowSize++;
+ int row3 = tablet3.rowSize++;
+ tablet1.addTimestamp(row1, row);
+ tablet2.addTimestamp(row2, row);
+ tablet3.addTimestamp(row3, row);
+ for (int i = 0; i < 3; i++) {
+ tablet1.addValue(schemaList1.get(0).getSubMeasurementsList().get(i),
row1, values.get(i));
+ tablet2.addValue(schemaList2.get(0).getSubMeasurementsList().get(i),
row2, values.get(i));
+ tablet3.addValue(schemaList3.get(0).getSubMeasurementsList().get(i),
row3, values.get(i));
+ }
+ }
+ session.insertTablets(tabletMap, true);
+
+ tabletMap.clear();
+ tablet1.setPrefixPath("root.sg0.d5.v1");
+ tablet2.setPrefixPath("root.sg0.d6.v1");
+ tablet3.setPrefixPath("root.sg0.d7.v1");
+ tabletMap.put("root.sg0.d5.v1", tablet1);
+ tabletMap.put("root.sg0.d6.v1", tablet2);
+ tabletMap.put("root.sg0.d7.v1", tablet3);
+ session.insertTablets(tabletMap, true);
+ tablet1.reset();
+ tablet2.reset();
+ tablet3.reset();
+
+ for (Statement readStatement : readStatements) {
+ for (int d = 1; d <= 7; d++) {
+ ResultSet resultSet =
+ readStatement.executeQuery(String.format("SELECT * from
root.sg0.d%d.v1", d));
+ for (long t = 1; t <= 3; t++) {
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(resultSet.getLong(1), t);
+ Assert.assertEquals(resultSet.getString(2), "1");
+ Assert.assertEquals(resultSet.getString(3), "2");
+ Assert.assertEquals(resultSet.getString(4), "3.0");
+ }
+ }
+
+ for (int sg = 1; sg <= 9; sg++) {
+ ResultSet resultSet =
+ readStatement.executeQuery(String.format("SELECT * from
root.sg%d.d1.v1", sg));
+ for (long t = 1; t <= 3; t++) {
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(resultSet.getLong(1), t);
+ Assert.assertEquals(resultSet.getString(2), "1");
+ Assert.assertEquals(resultSet.getString(3), "2");
+ Assert.assertEquals(resultSet.getString(4), "3.0");
+ }
+ }
+ }
+ }
+
+ @Test
public void testInsertTabletWithNullValues()
throws IoTDBConnectionException, StatementExecutionException,
SQLException {
List<IMeasurementSchema> schemaList = new ArrayList<>();