This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_all_column_failed_empty_tsfile in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 96f252d4d26052d06388a020eccf9293d7d9e850 Author: HTHou <[email protected]> AuthorDate: Wed Nov 1 23:41:16 2023 +0800 Fix empty tsfile genarated when all datatypes in one insert mismatching --- .../plan/planner/plan/node/write/InsertNode.java | 7 ++ .../db/storageengine/dataregion/DataRegion.java | 7 +- .../storageengine/dataregion/DataRegionTest.java | 120 +++++++++++++++++++++ 3 files changed, 132 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index 69beae45ddd..8d29348c702 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -280,6 +280,13 @@ public abstract class InsertNode extends WritePlanNode implements ComparableCons public int getFailedMeasurementNumber() { return failedMeasurementNumber; } + + public boolean allMeasurementFailed() { + if (measurements != null) { + return failedMeasurementNumber >= measurements.length; + } + return true; + } // endregion // region progress index diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 3d63bc658ba..ca572a9aaa1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1034,8 +1034,8 @@ public class DataRegion implements IDataRegionForQuery { boolean sequence, TSStatus[] results, long timePartitionId) { - // return when start >= end - if (start >= end) { + // return when start >= end or all measurement failed + if (start >= end || insertTabletNode.allMeasurementFailed()) { return true; } @@ -1101,6 +1101,9 @@ public class DataRegion implements IDataRegionForQuery { private void insertToTsFileProcessor( InsertRowNode insertRowNode, boolean sequence, long timePartitionId) throws WriteProcessException { + if (insertRowNode.allMeasurementFailed()) { + return; + } TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); if (tsFileProcessor == null) { return; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index 4adb806b74b..1479ba63f05 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -384,6 +384,86 @@ public class DataRegionTest { } } + @Test + public void testAllMeasurementsFailedTabletWriteAndSyncClose() + throws QueryProcessException, IllegalPathException, WriteProcessException { + String[] measurements = new String[2]; + measurements[0] = "s0"; + measurements[1] = "s1"; + TSDataType[] dataTypes = new TSDataType[2]; + dataTypes[0] = TSDataType.INT32; + dataTypes[1] = TSDataType.INT64; + + MeasurementSchema[] measurementSchemas = new MeasurementSchema[2]; + measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN); + measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN); + + long[] times = new long[100]; + Object[] columns = new Object[2]; + columns[0] = new int[100]; + columns[1] = new long[100]; + + for (int r = 0; r < 100; r++) { + times[r] = r; + ((int[]) columns[0])[r] = 1; + ((long[]) columns[1])[r] = 1; + } + + InsertTabletNode insertTabletNode1 = + new InsertTabletNode( + new QueryId("test_write").genPlanNodeId(), + new PartialPath("root.vehicle.d0"), + false, + measurements, + dataTypes, + measurementSchemas, + times, + null, + columns, + times.length); + insertTabletNode1.setFailedMeasurementNumber(2); + + dataRegion.insertTablet(insertTabletNode1); + dataRegion.asyncCloseAllWorkingTsFileProcessors(); + + for (int r = 50; r < 149; r++) { + times[r - 50] = r; + ((int[]) columns[0])[r - 50] = 1; + ((long[]) columns[1])[r - 50] = 1; + } + + InsertTabletNode insertTabletNode2 = + new InsertTabletNode( + new QueryId("test_write").genPlanNodeId(), + new PartialPath("root.vehicle.d0"), + false, + measurements, + dataTypes, + measurementSchemas, + times, + null, + columns, + times.length); + insertTabletNode2.setFailedMeasurementNumber(2); + + dataRegion.insertTablet(insertTabletNode2); + dataRegion.asyncCloseAllWorkingTsFileProcessors(); + dataRegion.syncCloseAllWorkingTsFileProcessors(); + + QueryDataSource queryDataSource = + dataRegion.query( + Collections.singletonList(new PartialPath(deviceId, measurementId)), + deviceId, + context, + null); + + Assert.assertEquals(0, queryDataSource.getSeqResources().size()); + Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); + for (TsFileResource resource : queryDataSource.getSeqResources()) { + Assert.assertTrue(resource.isClosed()); + } + } + @Test public void testSeqAndUnSeqSyncClose() throws WriteProcessException, QueryProcessException, IllegalPathException { @@ -420,6 +500,46 @@ public class DataRegionTest { } } + @Test + public void testAllMeasurementsFailedRecordSeqAndUnSeqSyncClose() + throws WriteProcessException, QueryProcessException, IllegalPathException { + for (int j = 21; j <= 30; j++) { + TSRecord record = new TSRecord(j, deviceId); + record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); + InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); + rowNode.setFailedMeasurementNumber(1); + dataRegion.insert(rowNode); + dataRegion.asyncCloseAllWorkingTsFileProcessors(); + } + dataRegion.syncCloseAllWorkingTsFileProcessors(); + + for (int j = 10; j >= 1; j--) { + TSRecord record = new TSRecord(j, deviceId); + record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); + InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); + rowNode.setFailedMeasurementNumber(1); + dataRegion.insert(rowNode); + dataRegion.asyncCloseAllWorkingTsFileProcessors(); + } + + dataRegion.syncCloseAllWorkingTsFileProcessors(); + + QueryDataSource queryDataSource = + dataRegion.query( + Collections.singletonList(new PartialPath(deviceId, measurementId)), + deviceId, + context, + null); + Assert.assertEquals(0, queryDataSource.getSeqResources().size()); + Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); + for (TsFileResource resource : queryDataSource.getSeqResources()) { + Assert.assertTrue(resource.isClosed()); + } + for (TsFileResource resource : queryDataSource.getUnseqResources()) { + Assert.assertTrue(resource.isClosed()); + } + } + @Test public void testEnableDiscardOutOfOrderDataForInsertRowPlan() throws WriteProcessException, QueryProcessException, IllegalPathException, IOException {
