This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this push: new f1ec46511f3 Table model ingestion, fix sql insert problem f1ec46511f3 is described below commit f1ec46511f3edb6315edc44635f35711f8cfbc2b Author: Jiang Tian <jt2594...@163.com> AuthorDate: Tue Jul 23 14:25:54 2024 +0800 Table model ingestion, fix sql insert problem --- .../iotdb/session/it/IoTDBSessionRelationalIT.java | 126 +++++++++++++++++---- .../planner/plan/node/write/InsertTabletNode.java | 6 +- .../plan/node/write/RelationalInsertRowNode.java | 22 ++-- .../node/write/RelationalInsertTabletNode.java | 36 +++--- .../fetcher/TableHeaderSchemaValidator.java | 2 +- .../relational/sql/ast/WrappedInsertStatement.java | 27 +++-- .../plan/statement/crud/InsertTabletStatement.java | 8 +- .../iotdb/db/storageengine/StorageEngine.java | 2 +- .../dataregion/memtable/TsFileProcessor.java | 9 +- 9 files changed, 175 insertions(+), 63 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java index 3a06c9fe433..d91db30723a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java @@ -29,7 +29,6 @@ import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.record.Tablet.ColumnType; @@ -43,9 +42,7 @@ import org.junit.runner.RunWith; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT; @@ -75,7 +72,77 @@ public class IoTDBSessionRelationalIT { // for manual debugging public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { - insertRelationalTabletPerformanceTest(); + try (ISession session = + new Session.Builder().host("127.0.0.1").port(6667).sqlDialect(TABLE_SQL_DIALECT).build()) { + session.open(); + + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS \"db1\""); + + session.executeNonQueryStatement("USE \"db1\""); + // only one column in this table, and others should be auto-created + session.executeNonQueryStatement("CREATE TABLE table1 (id1 string id)"); + + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("id2", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE)); + final List<ColumnType> columnTypes = + Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE, ColumnType.MEASUREMENT); + + long timestamp = 0; + Tablet tablet = new Tablet("table1", schemaList, columnTypes, 15); + + for (long row = 0; row < 15; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp + row); + tablet.addValue("id2", rowIndex, "id:" + row); + tablet.addValue("attr1", rowIndex, "attr:" + row); + tablet.addValue("m1", rowIndex, row * 1.0); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertRelationalTablet(tablet, true); + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + session.insertRelationalTablet(tablet); + tablet.reset(); + } + + session.executeNonQueryStatement("FLush"); + + for (long row = 15; row < 30; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp + row); + tablet.addValue("id2", rowIndex, "id:" + row); + tablet.addValue("attr1", rowIndex, "attr:" + row); + tablet.addValue("m1", rowIndex, row * 1.0); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertRelationalTablet(tablet, true); + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + session.insertRelationalTablet(tablet); + tablet.reset(); + } + + SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + timestamp = rowRecord.getFields().get(0).getLongV(); + // id 1 should be null + assertNull(rowRecord.getFields().get(1).getDataType()); + assertEquals("id:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); + assertEquals("attr:" + timestamp, rowRecord.getFields().get(3).getBinaryV().toString()); + assertEquals(timestamp * 1.0, rowRecord.getFields().get(4).getDoubleV(), 0.0001); + cnt++; + } + assertEquals(30, cnt); + } + // insertRelationalTabletPerformanceTest(); } private static void insertRelationalTabletPerformanceTest() @@ -162,8 +229,8 @@ public class IoTDBSessionRelationalIT { for (long row = 0; row < 15; row++) { session.executeNonQueryStatement( String.format( - "INSERT INTO table1 (id1, attr1, m1) VALUES ('%s', '%s', %f)", - "id:" + row, "attr:" + row, row * 1.0)); + "INSERT INTO table1 (time, id1, attr1, m1) VALUES (%d, '%s', '%s', %f)", + row, "id:" + row, "attr:" + row, row * 1.0)); } session.executeNonQueryStatement("FLush"); @@ -171,18 +238,21 @@ public class IoTDBSessionRelationalIT { for (long row = 15; row < 30; row++) { session.executeNonQueryStatement( String.format( - "INSERT INTO table1 (id1, attr1, m1) VALUES ('%s', '%s', %f)", - "id:" + row, "attr:" + row, row * 1.0)); + "INSERT INTO table1 (time, id1, attr1, m1) VALUES (%d, '%s', '%s', %f)", + row, "id:" + row, "attr:" + row, row * 1.0)); } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); // sql cannot create column assertThrows( @@ -193,14 +263,6 @@ public class IoTDBSessionRelationalIT { "INSERT INTO table1 (id1, id2, attr1, m1) VALUES ('%s', '%s', '%s', %f)", "id:" + 100, "id:" + 100, "attr:" + 100, 100 * 1.0))); } - Map<String, ChunkMetadata> chunkMetadataMap = new HashMap<>(); - List<ChunkMetadata> valueChunkMetadataList = new ArrayList<>(); - chunkMetadataMap.computeIfPresent( - "", - (k, v) -> { - valueChunkMetadataList.add(v); - return v; - }); } @Test @@ -269,6 +331,7 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); @@ -277,9 +340,9 @@ public class IoTDBSessionRelationalIT { assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); // "m2" should not be present assertEquals(4, rowRecord.getFields().size()); - timestamp++; - // System.out.println(rowRecord); + cnt++; } + assertEquals(30, cnt); } finally { EnvFactory.getEnv().getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); } @@ -326,13 +389,16 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); } } @@ -438,6 +504,7 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); @@ -446,9 +513,9 @@ public class IoTDBSessionRelationalIT { assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); // "m2" should not be present assertEquals(4, rowRecord.getFields().size()); - timestamp++; - // System.out.println(rowRecord); + cnt++; } + assertEquals(30, cnt); } finally { EnvFactory.getEnv().getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); } @@ -511,6 +578,7 @@ public class IoTDBSessionRelationalIT { tablet.reset(); } + int cnt = 0; SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); @@ -518,7 +586,9 @@ public class IoTDBSessionRelationalIT { assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); } } @@ -558,6 +628,7 @@ public class IoTDBSessionRelationalIT { "table1", timestamp + row, measurementIds, dataTypes, columnTypes, values); } + int cnt = 0; SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); @@ -565,7 +636,9 @@ public class IoTDBSessionRelationalIT { assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); } } @@ -625,15 +698,16 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); - timestamp++; - // System.out.println(rowRecord); + cnt++; } + assertEquals(30, cnt); } } @@ -693,6 +767,7 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); @@ -701,9 +776,9 @@ public class IoTDBSessionRelationalIT { assertEquals("id:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(3).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(4).getDoubleV(), 0.0001); - timestamp++; - // System.out.println(rowRecord); + cnt++; } + assertEquals(30, cnt); } } @@ -752,6 +827,7 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); @@ -759,7 +835,9 @@ public class IoTDBSessionRelationalIT { assertEquals("id2:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals("attr1:" + timestamp, rowRecord.getFields().get(3).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(4).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index be5e26f7858..4f46c6fda54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -631,7 +631,11 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { case STRING: Binary[] binaryValues = (Binary[]) column; for (int j = 0; j < rowCount; j++) { - ReadWriteIOUtils.write(binaryValues[j], buffer); + if (binaryValues[j] != null) { + ReadWriteIOUtils.write(binaryValues[j], buffer); + } else { + ReadWriteIOUtils.write(0, buffer); + } } break; default: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java index 97dd3d3673e..df9f9174ce9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java @@ -181,31 +181,37 @@ public class RelationalInsertRowNode extends InsertRowNode { @Override void subSerialize(ByteBuffer buffer) { super.subSerialize(buffer); - for (int i = 0; i < dataTypes.length; i++) { - columnCategories[i].serialize(buffer); + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + columnCategories[i].serialize(buffer); + } } } @Override void subSerialize(DataOutputStream stream) throws IOException { super.subSerialize(stream); - for (int i = 0; i < dataTypes.length; i++) { - columnCategories[i].serialize(stream); + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + columnCategories[i].serialize(stream); + } } } @Override protected void subSerialize(IWALByteBufferView buffer) { super.subSerialize(buffer); - for (int i = 0; i < dataTypes.length; i++) { - buffer.put(columnCategories[i].getCategory()); + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + buffer.put(columnCategories[i].getCategory()); + } } } public void subDeserialize(ByteBuffer buffer) { super.subDeserialize(buffer); - TsTableColumnCategory[] newColumnCategories = new TsTableColumnCategory[dataTypes.length]; - for (int i = 0; i < dataTypes.length; i++) { + TsTableColumnCategory[] newColumnCategories = new TsTableColumnCategory[measurements.length]; + for (int i = 0; i < measurements.length; i++) { newColumnCategories[i] = TsTableColumnCategory.deserialize(buffer); } setColumnCategories(newColumnCategories); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 6d0b6cccbb4..3902c9ae9b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -103,7 +103,11 @@ public class RelationalInsertTabletNode extends InsertTabletNode { for (int i = 0; i < idColumnIndices.size(); i++) { final Integer columnIndex = idColumnIndices.get(i); Object idSeg = ((Object[]) columns[columnIndex])[rowIdx]; - deviceIdSegments[i + 1] = idSeg != null ? idSeg.toString() : null; + boolean isNull = + bitMaps != null + && bitMaps[columnIndex] != null + && bitMaps[columnIndex].isMarked(rowIdx); + deviceIdSegments[i + 1] = !isNull && idSeg != null ? idSeg.toString() : null; } deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments); } @@ -144,23 +148,27 @@ public class RelationalInsertTabletNode extends InsertTabletNode { @Override protected void serializeAttributes(ByteBuffer byteBuffer) { super.serializeAttributes(byteBuffer); - for (int i = 0; i < dataTypes.length; i++) { - columnCategories[i].serialize(byteBuffer); + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + columnCategories[i].serialize(byteBuffer); + } } } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { super.serializeAttributes(stream); - for (int i = 0; i < dataTypes.length; i++) { - columnCategories[i].serialize(stream); + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + columnCategories[i].serialize(stream); + } } } public void subDeserialize(ByteBuffer buffer) { super.subDeserialize(buffer); - TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[dataTypes.length]; - for (int i = 0; i < dataTypes.length; i++) { + TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[measurements.length]; + for (int i = 0; i < measurements.length; i++) { columnCategories[i] = TsTableColumnCategory.deserialize(buffer); } setColumnCategories(columnCategories); @@ -168,16 +176,18 @@ public class RelationalInsertTabletNode extends InsertTabletNode { void subSerialize(IWALByteBufferView buffer, int start, int end) { super.subSerialize(buffer, start, end); - for (int i = 0; i < dataTypes.length; i++) { - buffer.put(columnCategories[i].getCategory()); + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + buffer.put(columnCategories[i].getCategory()); + } } } @Override protected void subDeserializeFromWAL(ByteBuffer buffer) { super.subDeserializeFromWAL(buffer); - TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[dataTypes.length]; - for (int i = 0; i < dataTypes.length; i++) { + TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[measurements.length]; + for (int i = 0; i < measurements.length; i++) { columnCategories[i] = TsTableColumnCategory.deserialize(buffer); } setColumnCategories(columnCategories); @@ -186,8 +196,8 @@ public class RelationalInsertTabletNode extends InsertTabletNode { @Override protected void subDeserializeFromWAL(DataInputStream stream) throws IOException { super.subDeserializeFromWAL(stream); - TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[dataTypes.length]; - for (int i = 0; i < dataTypes.length; i++) { + TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[measurements.length]; + for (int i = 0; i < measurements.length; i++) { columnCategories[i] = TsTableColumnCategory.deserialize(stream); } setColumnCategories(columnCategories); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java index b6faadf5ebb..3c51cd4d195 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java @@ -134,7 +134,7 @@ public class TableHeaderSchemaValidator { // TODO table metadata: authority check for table alter // check id or attribute column data type in this method autoCreateColumn(database, tableSchema.getTableName(), missingColumnList, context); - resultColumnList.addAll(missingColumnList); + table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName()); } table diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java index a609d5f5218..b78830a1dab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java @@ -74,16 +74,21 @@ public abstract class WrappedInsertStatement extends WrappedStatement List<ColumnSchema> columnSchemas = new ArrayList<>(insertBaseStatement.getMeasurements().length); for (int i = 0; i < insertBaseStatement.getMeasurements().length; i++) { - columnSchemas.add( - new ColumnSchema( - insertBaseStatement.getMeasurements()[i], - insertBaseStatement.getDataTypes() != null - ? TypeFactory.getType(insertBaseStatement.getDataTypes()[i]) - : null, - false, - insertBaseStatement.getColumnCategories() != null - ? insertBaseStatement.getColumnCategories()[i] - : null)); + if (insertBaseStatement.getMeasurements()[i] != null) { + columnSchemas.add( + new ColumnSchema( + insertBaseStatement.getMeasurements()[i], + insertBaseStatement.getDataTypes() != null + && insertBaseStatement.getDataTypes()[i] != null + ? TypeFactory.getType(insertBaseStatement.getDataTypes()[i]) + : null, + false, + insertBaseStatement.getColumnCategories() != null + ? insertBaseStatement.getColumnCategories()[i] + : null)); + } else { + columnSchemas.add(null); + } } return new TableSchema(tableName, columnSchemas); } @@ -122,7 +127,7 @@ public abstract class WrappedInsertStatement extends WrappedStatement */ public void adjustIdColumns( List<ColumnSchema> realIdColumnSchemas, final InsertBaseStatement baseStatement) { - List<ColumnSchema> incomingColumnSchemas = getTableSchema().getColumns(); + List<ColumnSchema> incomingColumnSchemas = toTableSchema(baseStatement).getColumns(); for (int realIdColPos = 0; realIdColPos < realIdColumnSchemas.size(); realIdColPos++) { ColumnSchema realColumn = realIdColumnSchemas.get(realIdColPos); int incomingIdColPos = incomingColumnSchemas.indexOf(realColumn); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index ce3fb28f913..3bfe7d8fc33 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -466,7 +466,13 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem public void insertColumn(int pos, ColumnSchema columnSchema) { super.insertColumn(pos, columnSchema); - if (bitMaps != null) { + if (bitMaps == null) { + bitMaps = new BitMap[measurements.length]; + bitMaps[pos] = new BitMap(rowCount); + for (int i = 0; i < rowCount; i++) { + bitMaps[pos].mark(i); + } + } else { BitMap[] tmpBitmaps = new BitMap[bitMaps.length + 1]; System.arraycopy(bitMaps, 0, tmpBitmaps, 0, pos); tmpBitmaps[pos] = new BitMap(rowCount); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 8758af3aa00..5035b389279 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -578,7 +578,7 @@ public class StorageEngine implements IService { } public void operateFlush(TFlushReq req) { - if (req.storageGroups == null) { + if (req.storageGroups == null || req.storageGroups.isEmpty()) { StorageEngine.getInstance().syncCloseAllProcessor(); WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 951f655c5e4..169428dbeb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -568,15 +568,18 @@ public class TsFileProcessor { // For sequence tsfile, we update the endTime only when the file is prepared to be closed. // For unsequence tsfile, we have to update the endTime for each insertion. tsFileResource.updateEndTime( - deviceEndOffsetPairs.get(0).left, deviceEndOffsetPairs.get(0).right); + deviceEndOffsetPairs.get(0).left, + insertTabletNode.getTimes()[deviceEndOffsetPairs.get(0).right - 1]); } for (int i = 1; i < deviceEndOffsetPairs.size(); i++) { // the end offset of i - 1 is the start offset of i tsFileResource.updateStartTime( - deviceEndOffsetPairs.get(i).left, deviceEndOffsetPairs.get(i - 1).right); + deviceEndOffsetPairs.get(i).left, + insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i - 1).right]); if (!sequence) { tsFileResource.updateEndTime( - deviceEndOffsetPairs.get(i).left, deviceEndOffsetPairs.get(i).right); + deviceEndOffsetPairs.get(i).left, + insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i).right - 1]); } }