This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_all_column_failed_empty_tsfile
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 96f252d4d26052d06388a020eccf9293d7d9e850
Author: HTHou <[email protected]>
AuthorDate: Wed Nov 1 23:41:16 2023 +0800

    Fix empty tsfile genarated when all datatypes in one insert mismatching
---
 .../plan/planner/plan/node/write/InsertNode.java   |   7 ++
 .../db/storageengine/dataregion/DataRegion.java    |   7 +-
 .../storageengine/dataregion/DataRegionTest.java   | 120 +++++++++++++++++++++
 3 files changed, 132 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 69beae45ddd..8d29348c702 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -280,6 +280,13 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
   public int getFailedMeasurementNumber() {
     return failedMeasurementNumber;
   }
+
+  public boolean allMeasurementFailed() {
+    if (measurements != null) {
+      return failedMeasurementNumber >= measurements.length;
+    }
+    return true;
+  }
   // endregion
 
   // region progress index
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3d63bc658ba..ca572a9aaa1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1034,8 +1034,8 @@ public class DataRegion implements IDataRegionForQuery {
       boolean sequence,
       TSStatus[] results,
       long timePartitionId) {
-    // return when start >= end
-    if (start >= end) {
+    // return when start >= end or all measurement failed
+    if (start >= end || insertTabletNode.allMeasurementFailed()) {
       return true;
     }
 
@@ -1101,6 +1101,9 @@ public class DataRegion implements IDataRegionForQuery {
   private void insertToTsFileProcessor(
       InsertRowNode insertRowNode, boolean sequence, long timePartitionId)
       throws WriteProcessException {
+    if (insertRowNode.allMeasurementFailed()) {
+      return;
+    }
     TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, sequence);
     if (tsFileProcessor == null) {
       return;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 4adb806b74b..1479ba63f05 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -384,6 +384,86 @@ public class DataRegionTest {
     }
   }
 
+  @Test
+  public void testAllMeasurementsFailedTabletWriteAndSyncClose()
+      throws QueryProcessException, IllegalPathException, 
WriteProcessException {
+    String[] measurements = new String[2];
+    measurements[0] = "s0";
+    measurements[1] = "s1";
+    TSDataType[] dataTypes = new TSDataType[2];
+    dataTypes[0] = TSDataType.INT32;
+    dataTypes[1] = TSDataType.INT64;
+
+    MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+    measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, 
TSEncoding.PLAIN);
+    measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, 
TSEncoding.PLAIN);
+
+    long[] times = new long[100];
+    Object[] columns = new Object[2];
+    columns[0] = new int[100];
+    columns[1] = new long[100];
+
+    for (int r = 0; r < 100; r++) {
+      times[r] = r;
+      ((int[]) columns[0])[r] = 1;
+      ((long[]) columns[1])[r] = 1;
+    }
+
+    InsertTabletNode insertTabletNode1 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurements,
+            dataTypes,
+            measurementSchemas,
+            times,
+            null,
+            columns,
+            times.length);
+    insertTabletNode1.setFailedMeasurementNumber(2);
+
+    dataRegion.insertTablet(insertTabletNode1);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+    for (int r = 50; r < 149; r++) {
+      times[r - 50] = r;
+      ((int[]) columns[0])[r - 50] = 1;
+      ((long[]) columns[1])[r - 50] = 1;
+    }
+
+    InsertTabletNode insertTabletNode2 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurements,
+            dataTypes,
+            measurementSchemas,
+            times,
+            null,
+            columns,
+            times.length);
+    insertTabletNode2.setFailedMeasurementNumber(2);
+
+    dataRegion.insertTablet(insertTabletNode2);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, 
measurementId)),
+            deviceId,
+            context,
+            null);
+
+    Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+  }
+
   @Test
   public void testSeqAndUnSeqSyncClose()
       throws WriteProcessException, QueryProcessException, 
IllegalPathException {
@@ -420,6 +500,46 @@ public class DataRegionTest {
     }
   }
 
+  @Test
+  public void testAllMeasurementsFailedRecordSeqAndUnSeqSyncClose()
+      throws WriteProcessException, QueryProcessException, 
IllegalPathException {
+    for (int j = 21; j <= 30; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record);
+      rowNode.setFailedMeasurementNumber(1);
+      dataRegion.insert(rowNode);
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (int j = 10; j >= 1; j--) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record);
+      rowNode.setFailedMeasurementNumber(1);
+      dataRegion.insert(rowNode);
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, 
measurementId)),
+            deviceId,
+            context,
+            null);
+    Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+    for (TsFileResource resource : queryDataSource.getUnseqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+  }
+
   @Test
   public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
       throws WriteProcessException, QueryProcessException, 
IllegalPathException, IOException {

Reply via email to