This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch insert-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/insert-fix by this push:
     new 404ea0c0837 insert-fix
404ea0c0837 is described below

commit 404ea0c0837296ced2f09eab05c232600eeada40
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 28 16:52:04 2026 +0800

    insert-fix
---
 .../planner/plan/node/write/InsertTabletNode.java  |  60 ++++-
 .../node/write/RelationalInsertTabletNode.java     |   6 +-
 .../db/storageengine/dataregion/DataRegion.java    | 275 +++++++++++++--------
 .../storageengine/dataregion/DataRegionTest.java   | 227 +++++++++++++++++
 4 files changed, 458 insertions(+), 110 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 490f16ca5f0..995e8a95e3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -1193,36 +1193,72 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     if (lastIdx < startOffset) {
       return null;
     }
+    return composeTimeValuePair(measurementIndex, lastIdx);
+  }
+
+  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+    return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+  }
+
+  protected TimeValuePair composeLastTimeValuePair(
+      final int measurementIndex,
+      final TSStatus[] results,
+      final int startOffset,
+      final int endOffset) {
+    if (results == null) {
+      return composeLastTimeValuePair(measurementIndex, startOffset, 
endOffset);
+    }
+    if (measurementIndex >= columns.length || 
Objects.isNull(dataTypes[measurementIndex])) {
+      return null;
+    }
+
+    final BitMap bitMap = bitMaps == null ? null : bitMaps[measurementIndex];
+    int lastIdx = Math.min(endOffset - 1, rowCount - 1);
+    while (lastIdx >= startOffset) {
+      if (results[lastIdx] != null
+          && results[lastIdx].getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        lastIdx--;
+        continue;
+      }
+      if (bitMap != null && bitMap.isMarked(lastIdx)) {
+        lastIdx--;
+        continue;
+      }
+      break;
+    }
+    return lastIdx < startOffset ? null : 
composeTimeValuePair(measurementIndex, lastIdx);
+  }
 
+  private TimeValuePair composeTimeValuePair(final int measurementIndex, final 
int rowIndex) {
     TsPrimitiveType value;
     switch (dataTypes[measurementIndex]) {
       case INT32:
       case DATE:
         int[] intValues = (int[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+        value = new TsPrimitiveType.TsInt(intValues[rowIndex]);
         break;
       case INT64:
       case TIMESTAMP:
         long[] longValues = (long[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+        value = new TsPrimitiveType.TsLong(longValues[rowIndex]);
         break;
       case FLOAT:
         float[] floatValues = (float[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+        value = new TsPrimitiveType.TsFloat(floatValues[rowIndex]);
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+        value = new TsPrimitiveType.TsDouble(doubleValues[rowIndex]);
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+        value = new TsPrimitiveType.TsBoolean(boolValues[rowIndex]);
         break;
       case TEXT:
       case BLOB:
       case STRING:
         Binary[] binaryValues = (Binary[]) columns[measurementIndex];
-        value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+        value = new TsPrimitiveType.TsBinary(binaryValues[rowIndex]);
         break;
       case OBJECT:
         return null;
@@ -1230,11 +1266,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         throw new UnSupportedDataTypeException(
             String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
     }
-    return new TimeValuePair(times[lastIdx], value);
-  }
-
-  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
-    return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+    return new TimeValuePair(times[rowIndex], value);
   }
 
   public IDeviceID getDeviceID(int rowIdx) {
@@ -1313,10 +1345,14 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   public void updateLastCache(final String databaseName) {
+    updateLastCache(databaseName, null);
+  }
+
+  public void updateLastCache(final String databaseName, final TSStatus[] 
results) {
     final String[] rawMeasurements = getRawMeasurements();
     final TimeValuePair[] timeValuePairs = new 
TimeValuePair[rawMeasurements.length];
     for (int i = 0; i < rawMeasurements.length; i++) {
-      timeValuePairs[i] = composeLastTimeValuePair(i);
+      timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount);
     }
     TreeDeviceSchemaCacheManager.getInstance()
         .updateLastCacheIfExists(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index b9f7edbe87c..8d24ad77364 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -371,6 +371,10 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
 
   @Override
   public void updateLastCache(final String databaseName) {
+    updateLastCache(databaseName, null);
+  }
+
+  public void updateLastCache(final String databaseName, final TSStatus[] 
results) {
     final String[] rawMeasurements = getRawMeasurements();
 
     final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs = 
splitByDevice(0, rowCount);
@@ -381,7 +385,7 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
 
       final TimeValuePair[] timeValuePairs = new 
TimeValuePair[rawMeasurements.length];
       for (int i = 0; i < rawMeasurements.length; i++) {
-        timeValuePairs[i] = composeLastTimeValuePair(i, startOffset, 
endOffset);
+        timeValuePairs[i] = composeLastTimeValuePair(i, results, startOffset, 
endOffset);
       }
       TableDeviceSchemaCache.getInstance()
           .updateLastCacheIfExists(databaseName, deviceID, rawMeasurements, 
timeValuePairs);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 933efbae84c..e5d75b4a210 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1455,7 +1455,7 @@ public class DataRegion implements IDataRegionForQuery {
         && !insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
       // disable updating last cache on follower
       long startTime = System.nanoTime();
-      tryToUpdateInsertTabletLastCache(insertTabletNode);
+      tryToUpdateInsertTabletLastCache(insertTabletNode, results);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
     }
     return noFailure;
@@ -1506,18 +1506,12 @@ public class DataRegion implements IDataRegionForQuery {
       return true;
     }
 
-    TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, sequence);
-    if (tsFileProcessor == null) {
-      for (int[] rangePair : rangeList) {
-        int start = rangePair[0];
-        int end = rangePair[1];
-        for (int i = start; i < end; i++) {
-          results[i] =
-              RpcUtils.getStatus(
-                  TSStatusCode.INTERNAL_SERVER_ERROR,
-                  "can not create TsFileProcessor, timePartitionId: " + 
timePartitionId);
-        }
-      }
+    final TsFileProcessor tsFileProcessor;
+    try {
+      tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
+    } catch (WriteProcessException e) {
+      markInsertTabletRangesFailed(
+          rangeList, results, RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
       return false;
     }
 
@@ -1546,6 +1540,15 @@ public class DataRegion implements IDataRegionForQuery {
     return true;
   }
 
+  private void markInsertTabletRangesFailed(
+      final List<int[]> rangeList, final TSStatus[] results, final TSStatus 
failureStatus) {
+    for (int[] rangePair : rangeList) {
+      for (int i = rangePair[0]; i < rangePair[1]; i++) {
+        results[i] = failureStatus;
+      }
+    }
+  }
+
   private TableSchema getTableSchemaFromCache(
       final String database, final String tableName, final Pair<Long, Long> 
currentVersion) {
     final TableSchemaCacheKey key = new TableSchemaCacheKey(database, 
tableName);
@@ -1679,6 +1682,11 @@ public class DataRegion implements IDataRegionForQuery {
     node.updateLastCache(getDatabaseName());
   }
 
+  private void tryToUpdateInsertTabletLastCache(
+      final InsertTabletNode node, final TSStatus[] results) {
+    node.updateLastCache(getDatabaseName(), results);
+  }
+
   private TsFileProcessor insertToTsFileProcessor(
       InsertRowNode insertRowNode, boolean sequence, long timePartitionId)
       throws WriteProcessException {
@@ -1686,19 +1694,16 @@ public class DataRegion implements IDataRegionForQuery {
       return null;
     }
     TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, sequence);
-    if (tsFileProcessor == null) {
-      return null;
-    }
     long[] infoForMetrics = new long[5];
     // infoForMetrics[0]: CreateMemtableBlockTimeCost
     // infoForMetrics[1]: ScheduleMemoryBlockTimeCost
     // infoForMetrics[2]: ScheduleWalTimeCost
     // infoForMetrics[3]: ScheduleMemTableTimeCost
     // infoForMetrics[4]: InsertedPointsNumber
-    tsFileProcessor.insert(insertRowNode, infoForMetrics);
-    updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
     // register TableSchema (and maybe more) for table insertion
     registerToTsFile(insertRowNode, tsFileProcessor);
+    tsFileProcessor.insert(insertRowNode, infoForMetrics);
+    updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
     return tsFileProcessor;
   }
 
@@ -1717,9 +1722,11 @@ public class DataRegion implements IDataRegionForQuery {
       if (insertRowNode.allMeasurementFailed()) {
         continue;
       }
-      TsFileProcessor tsFileProcessor =
-          getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
-      if (tsFileProcessor == null) {
+      final TsFileProcessor tsFileProcessor;
+      try {
+        tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i], 
areSequence[i]);
+      } catch (WriteProcessException e) {
+        insertRowsNode.getResults().put(i, 
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
         continue;
       }
       int finalI = i;
@@ -1727,78 +1734,156 @@ public class DataRegion implements IDataRegionForQuery 
{
           tsFileProcessor,
           (k, v) -> {
             if (v == null) {
-              v = insertRowsNode.emptyClone();
-              v.setSearchIndex(insertRowNode.getSearchIndex());
-              v.setAligned(insertRowNode.isAligned());
-              if (insertRowNode.isGeneratedByPipe()) {
-                v.markAsGeneratedByPipe();
-              }
-              if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-                v.markAsGeneratedByRemoteConsensusLeader();
-              }
+              v = createGroupedInsertRowsNode(insertRowsNode, insertRowNode);
             }
-            if (v.isAligned() != insertRowNode.isAligned()) {
-              v.setMixingAlignment(true);
-            }
-            v.addOneInsertRowNode(insertRowNode, finalI);
-            v.updateProgressIndex(insertRowNode.getProgressIndex());
+            appendInsertRowNode(v, insertRowNode, finalI);
             return v;
           });
     }
 
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
     for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
-      TsFileProcessor tsFileProcessor = entry.getKey();
       InsertRowsNode subInsertRowsNode = entry.getValue();
       try {
-        tsFileProcessor =
-            insertRowsWithTypeConsistencyCheck(tsFileProcessor, 
subInsertRowsNode, infoForMetrics);
+        List<TsFileProcessor> insertedProcessors =
+            insertRowsWithTypeConsistencyCheck(entry.getKey(), 
subInsertRowsNode, infoForMetrics);
+        
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+        for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+          // check memtable size and may asyncTryToFlush the work memtable
+          if (tsFileProcessor.shouldFlush()) {
+            fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+          }
+        }
       } catch (WriteProcessException e) {
-        insertRowsNode
-            .getResults()
-            .put(
-                subInsertRowsNode.getInsertRowNodeIndexList().get(0),
-                RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
-      }
-      
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
-      // check memtable size and may asyncTryToFlush the work memtable
-      if (entry.getKey().shouldFlush()) {
-        fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+        recordInsertRowsFailure(insertRowsNode, subInsertRowsNode, e);
       }
     }
     return executedInsertRowNodeList;
   }
 
-  private TsFileProcessor insertRowsWithTypeConsistencyCheck(
+  private List<TsFileProcessor> insertRowsWithTypeConsistencyCheck(
       TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode, 
long[] infoForMetrics)
       throws WriteProcessException {
     try {
       // register TableSchema (and maybe more) for table insertion
       registerToTsFile(subInsertRowsNode, tsFileProcessor);
       tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+      return Collections.singletonList(tsFileProcessor);
     } catch (DataTypeInconsistentException e) {
       InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0);
       long timePartitionId = 
TimePartitionUtils.getTimePartitionId(firstRow.getTime());
       // flush both MemTables so that the new type can be inserted into a new 
MemTable
-      TsFileProcessor workSequenceProcessor = 
workSequenceTsFileProcessors.get(timePartitionId);
-      if (workSequenceProcessor != null) {
-        fileFlushPolicy.apply(this, workSequenceProcessor, 
workSequenceProcessor.isSequence());
-      }
-      TsFileProcessor workUnsequenceProcessor = 
workUnsequenceTsFileProcessors.get(timePartitionId);
-      if (workUnsequenceProcessor != null) {
-        fileFlushPolicy.apply(this, workUnsequenceProcessor, 
workUnsequenceProcessor.isSequence());
-      }
+      flushWorkingProcessorsForTimePartition(timePartitionId);
+      return retryInsertRowsAfterFlush(subInsertRowsNode, timePartitionId, 
infoForMetrics);
+    }
+  }
 
-      boolean isSequence =
+  private InsertRowsNode createGroupedInsertRowsNode(
+      final InsertRowsNode sourceInsertRowsNode, final InsertRowNode 
firstInsertRowNode) {
+    final InsertRowsNode groupedInsertRowsNode = 
sourceInsertRowsNode.emptyClone();
+    initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+    return groupedInsertRowsNode;
+  }
+
+  private InsertRowsNode createGroupedInsertRowsNode(
+      final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+      final InsertRowNode firstInsertRowNode) {
+    final InsertRowsNode groupedInsertRowsNode =
+        new InsertRowsNode(sourceInsertRowsNode.getPlanNodeId());
+    initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+    return groupedInsertRowsNode;
+  }
+
+  private void initializeGroupedInsertRowsNode(
+      final InsertRowsNode groupedInsertRowsNode, final InsertRowNode 
firstInsertRowNode) {
+    groupedInsertRowsNode.setSearchIndex(firstInsertRowNode.getSearchIndex());
+    groupedInsertRowsNode.setAligned(firstInsertRowNode.isAligned());
+    if (firstInsertRowNode.isGeneratedByPipe()) {
+      groupedInsertRowsNode.markAsGeneratedByPipe();
+    }
+    if (firstInsertRowNode.isGeneratedByRemoteConsensusLeader()) {
+      groupedInsertRowsNode.markAsGeneratedByRemoteConsensusLeader();
+    }
+  }
+
+  private void appendInsertRowNode(
+      final InsertRowsNode groupedInsertRowsNode,
+      final InsertRowNode insertRowNode,
+      final int insertRowNodeIndex) {
+    if (groupedInsertRowsNode.isAligned() != insertRowNode.isAligned()) {
+      groupedInsertRowsNode.setMixingAlignment(true);
+    }
+    groupedInsertRowsNode.addOneInsertRowNode(insertRowNode, 
insertRowNodeIndex);
+    
groupedInsertRowsNode.updateProgressIndex(insertRowNode.getProgressIndex());
+  }
+
+  private void flushWorkingProcessorsForTimePartition(final long 
timePartitionId) {
+    TsFileProcessor workSequenceProcessor = 
workSequenceTsFileProcessors.get(timePartitionId);
+    if (workSequenceProcessor != null) {
+      fileFlushPolicy.apply(this, workSequenceProcessor, 
workSequenceProcessor.isSequence());
+    }
+    TsFileProcessor workUnsequenceProcessor = 
workUnsequenceTsFileProcessors.get(timePartitionId);
+    if (workUnsequenceProcessor != null) {
+      fileFlushPolicy.apply(this, workUnsequenceProcessor, 
workUnsequenceProcessor.isSequence());
+    }
+  }
+
+  private List<TsFileProcessor> retryInsertRowsAfterFlush(
+      final InsertRowsNode subInsertRowsNode,
+      final long timePartitionId,
+      final long[] infoForMetrics)
+      throws WriteProcessException {
+    final Map<TsFileProcessor, InsertRowsNode> retriedProcessorMap = new 
HashMap<>();
+    for (int i = 0; i < subInsertRowsNode.getInsertRowNodeList().size(); i++) {
+      final InsertRowNode insertRowNode = 
subInsertRowsNode.getInsertRowNodeList().get(i);
+      final boolean isSequence =
           config.isEnableSeparateData()
-              && firstRow.getTime()
-                  > lastFlushTimeMap.getFlushedTime(timePartitionId, 
firstRow.getDeviceID());
-      tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, 
isSequence);
-      registerToTsFile(subInsertRowsNode, tsFileProcessor);
-      tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+              && insertRowNode.getTime()
+                  > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
+      final TsFileProcessor retriedProcessor =
+          getOrCreateTsFileProcessor(timePartitionId, isSequence);
+      final int insertRowNodeIndex = 
subInsertRowsNode.getInsertRowNodeIndexList().get(i);
+      retriedProcessorMap.compute(
+          retriedProcessor,
+          (k, v) -> {
+            if (v == null) {
+              v = createGroupedInsertRowsNode(subInsertRowsNode, 
insertRowNode);
+            }
+            appendInsertRowNode(v, insertRowNode, insertRowNodeIndex);
+            return v;
+          });
+    }
+
+    final List<TsFileProcessor> insertedProcessors = new 
ArrayList<>(retriedProcessorMap.size());
+    for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry : 
retriedProcessorMap.entrySet()) {
+      final TsFileProcessor retriedProcessor = retriedEntry.getKey();
+      registerToTsFile(retriedEntry.getValue(), retriedProcessor);
+      retriedProcessor.insertRows(retriedEntry.getValue(), infoForMetrics);
+      insertedProcessors.add(retriedProcessor);
+    }
+    return insertedProcessors;
+  }
+
+  private void recordInsertRowsFailure(
+      final InsertRowsNode sourceInsertRowsNode,
+      final InsertRowsNode failedInsertRowsNode,
+      final WriteProcessException exception) {
+    final TSStatus failureStatus =
+        RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+    for (Integer failedIndex : 
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+      sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
+    }
+  }
+
+  private void recordInsertRowsFailure(
+      final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+      final InsertRowsNode failedInsertRowsNode,
+      final WriteProcessException exception) {
+    final TSStatus failureStatus =
+        RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+    for (Integer failedIndex : 
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+      sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
     }
-    return tsFileProcessor;
   }
 
   private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
@@ -1859,7 +1944,8 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean 
sequence) {
+  protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, 
boolean sequence)
+      throws WriteProcessException {
     TsFileProcessor tsFileProcessor = null;
     int retryCnt = 0;
     do {
@@ -1885,7 +1971,7 @@ public class DataRegion implements IDataRegionForQuery {
             "disk space is insufficient when creating TsFile processor, change 
system mode to read-only",
             e);
         
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
-        break;
+        throw new WriteProcessException(e.getMessage(), e.getErrorCode(), 
true);
       } catch (IOException e) {
         if (retryCnt < 3) {
           logger.warn("meet IOException when creating TsFileProcessor, retry 
it again", e);
@@ -1894,11 +1980,15 @@ public class DataRegion implements IDataRegionForQuery {
           logger.error(
               "meet IOException when creating TsFileProcessor, change system 
mode to error", e);
           
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
-          break;
+          throw new WriteProcessException(
+              String.format(
+                  "Failed to create TsFileProcessor for database %s, 
timePartitionId %s",
+                  databaseName, timeRangeId),
+              e);
         }
       } catch (ExceedQuotaException e) {
         logger.error(e.getMessage());
-        break;
+        throw new WriteProcessException(e.getMessage(), e.getErrorCode(), 
true);
       }
     } while (tsFileProcessor == null);
     return tsFileProcessor;
@@ -4490,8 +4580,13 @@ public class DataRegion implements IDataRegionForQuery {
             config.isEnableSeparateData()
                 && insertRowNode.getTime()
                     > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
-        TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, isSequence);
-        if (tsFileProcessor == null) {
+        final TsFileProcessor tsFileProcessor;
+        try {
+          tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, 
isSequence);
+        } catch (WriteProcessException e) {
+          insertRowsOfOneDeviceNode
+              .getResults()
+              .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
           continue;
         }
         int finalI = i;
@@ -4499,18 +4594,9 @@ public class DataRegion implements IDataRegionForQuery {
             tsFileProcessor,
             (k, v) -> {
               if (v == null) {
-                v = new 
InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId());
-                v.setSearchIndex(insertRowNode.getSearchIndex());
-                v.setAligned(insertRowNode.isAligned());
-                if (insertRowNode.isGeneratedByPipe()) {
-                  v.markAsGeneratedByPipe();
-                }
-                if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-                  v.markAsGeneratedByRemoteConsensusLeader();
-                }
+                v = createGroupedInsertRowsNode(insertRowsOfOneDeviceNode, 
insertRowNode);
               }
-              v.addOneInsertRowNode(insertRowNode, finalI);
-              v.updateProgressIndex(insertRowNode.getProgressIndex());
+              appendInsertRowNode(v, insertRowNode, finalI);
               return v;
             });
       }
@@ -4522,24 +4608,19 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumber
       for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
-        TsFileProcessor tsFileProcessor = entry.getKey();
         InsertRowsNode subInsertRowsNode = entry.getValue();
         try {
-          tsFileProcessor =
-              insertRowsWithTypeConsistencyCheck(
-                  tsFileProcessor, subInsertRowsNode, infoForMetrics);
+          List<TsFileProcessor> insertedProcessors =
+              insertRowsWithTypeConsistencyCheck(entry.getKey(), 
subInsertRowsNode, infoForMetrics);
+          
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+          for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+            // check memtable size and may asyncTryToFlush the work memtable
+            if (tsFileProcessor.shouldFlush()) {
+              fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+            }
+          }
         } catch (WriteProcessException e) {
-          insertRowsOfOneDeviceNode
-              .getResults()
-              .put(
-                  subInsertRowsNode.getInsertRowNodeIndexList().get(0),
-                  RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
-        }
-        
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
-        // check memtable size and may asyncTryToFlush the work memtable
-        if (tsFileProcessor.shouldFlush()) {
-          fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+          recordInsertRowsFailure(insertRowsOfOneDeviceNode, 
subInsertRowsNode, e);
         }
       }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 68d76764920..19ac712882b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion;
 
 import org.apache.iotdb.calc.exception.QueryProcessException;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -32,8 +33,10 @@ import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.NonAlignedFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -46,6 +49,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -68,6 +72,8 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -84,6 +90,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,6 +107,9 @@ import java.util.concurrent.Future;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
 
 public class DataRegionTest {
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
@@ -149,6 +159,7 @@ public class DataRegionTest {
       dataRegion.syncDeleteDataFiles();
       StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
     }
+    TreeDeviceSchemaCacheManager.getInstance().cleanUp();
     EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
     CompactionTaskManager.getInstance().stop();
     EnvironmentUtils.cleanEnv();
@@ -1064,6 +1075,196 @@ public class DataRegionTest {
     dataRegion1.syncDeleteDataFiles();
   }
 
+  @Test
+  public void testInsertRowPropagatesTsFileProcessorCreationFailure()
+      throws IllegalPathException, DataRegionException, 
TsFileProcessorException {
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.fail_row");
+    dataRegion1.setTsFileProcessorSupplier(
+        (timePartitionId, sequence) -> {
+          throw new WriteProcessRejectException("mock creation failure");
+        });
+
+    final TSRecord record = new TSRecord("root.fail_row", 1);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(1)));
+    final InsertRowNode insertRowNode = buildInsertRowNodeByTSRecord(record);
+
+    try {
+      dataRegion1.insert(insertRowNode);
+      Assert.fail("Expected WriteProcessRejectException");
+    } catch (WriteProcessRejectException e) {
+      Assert.assertTrue(e.getMessage().contains("mock creation failure"));
+    } catch (WriteProcessException e) {
+      Assert.fail("Expected WriteProcessRejectException but got " + 
e.getClass().getSimpleName());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+    }
+  }
+
+  @Test
+  public void testInsertRowsMarkAllFailedRowsForSameProcessor() throws 
Exception {
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.fail_rows");
+    final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+    Mockito.doThrow(new WriteProcessException("mock insert rows failure"))
+        .when(processor)
+        .insertRows(any(InsertRowsNode.class), any(long[].class));
+    Mockito.when(processor.shouldFlush()).thenReturn(false);
+    Mockito.when(processor.isSequence()).thenReturn(true);
+    dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> 
processor);
+
+    final List<Integer> indexList = Arrays.asList(0, 1);
+    final List<InsertRowNode> nodes = new ArrayList<>();
+    for (long time : new long[] {1, 2}) {
+      final TSRecord record = new TSRecord("root.fail_rows", time);
+      record.addTuple(
+          DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(time)));
+      nodes.add(buildInsertRowNodeByTSRecord(record));
+    }
+    final InsertRowsNode insertRowsNode = new InsertRowsNode(new 
PlanNodeId(""), indexList, nodes);
+
+    try {
+      dataRegion1.insert(insertRowsNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      Assert.assertEquals(2, insertRowsNode.getResults().size());
+      Assert.assertEquals(
+          TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+          insertRowsNode.getResults().get(0).getCode());
+      Assert.assertEquals(
+          TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+          insertRowsNode.getResults().get(1).getCode());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+    }
+  }
+
+  @Test
+  public void testInsertRowsLastCacheSkipsFailedRows() throws Exception {
+    final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+    COMMON_CONFIG.setLastCacheEnable(true);
+
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.cache_rows");
+    final TsFileProcessor successProcessor = 
Mockito.mock(TsFileProcessor.class);
+    Mockito.when(successProcessor.shouldFlush()).thenReturn(false);
+    Mockito.when(successProcessor.isSequence()).thenReturn(true);
+    final long failingTime = TimePartitionUtils.getTimePartitionInterval() + 1;
+    final long failingPartitionId = 
TimePartitionUtils.getTimePartitionId(failingTime);
+    dataRegion1.setTsFileProcessorSupplier(
+        (timePartitionId, sequence) -> {
+          if (timePartitionId == failingPartitionId) {
+            throw new WriteProcessException("mock row failure");
+          }
+          return successProcessor;
+        });
+
+    final MeasurementPath lastCachePath =
+        new MeasurementPath(
+            "root.cache_rows",
+            measurementId,
+            new MeasurementSchema(
+                measurementId, TSDataType.INT32, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED));
+    TreeDeviceSchemaCacheManager.getInstance()
+        .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath);
+
+    final List<Integer> indexList = Arrays.asList(0, 1);
+    final List<InsertRowNode> nodes = new ArrayList<>();
+    final long[] times = new long[] {1, failingTime};
+    final int[] values = new int[] {10, 20};
+    for (int i = 0; i < times.length; i++) {
+      final long time = times[i];
+      final TSRecord record = new TSRecord("root.cache_rows", time);
+      record.addTuple(
+          DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(values[i])));
+      nodes.add(buildInsertRowNodeByTSRecord(record));
+    }
+    final InsertRowsNode insertRowsNode = new InsertRowsNode(new 
PlanNodeId(""), indexList, nodes);
+
+    try {
+      dataRegion1.insert(insertRowsNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      final TimeValuePair lastCache =
+          
TreeDeviceSchemaCacheManager.getInstance().getLastCache(lastCachePath);
+      Assert.assertNotNull(lastCache);
+      Assert.assertEquals(1, lastCache.getTimestamp());
+      Assert.assertEquals(10, lastCache.getValue().getInt());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+      COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+    }
+  }
+
+  @Test
+  public void testInsertTabletLastCacheSkipsFailedRows() throws Exception {
+    final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+    COMMON_CONFIG.setLastCacheEnable(true);
+
+    final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir, 
"root.cache_tablet");
+    final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+    Mockito.doAnswer(
+            invocation -> {
+              TSStatus[] results = invocation.getArgument(2);
+              results[0] = RpcUtils.SUCCESS_STATUS;
+              results[1] =
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), 
"mock row failure");
+              throw new WriteProcessException("mock tablet failure");
+            })
+        .when(processor)
+        .insertTablet(
+            any(InsertTabletNode.class),
+            anyList(),
+            any(TSStatus[].class),
+            anyBoolean(),
+            any(long[].class));
+    Mockito.when(processor.shouldFlush()).thenReturn(false);
+    Mockito.when(processor.isSequence()).thenReturn(true);
+    dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) -> 
processor);
+
+    final MeasurementPath lastCachePath =
+        new MeasurementPath(
+            "root.cache_tablet",
+            measurementId,
+            new MeasurementSchema(
+                measurementId, TSDataType.INT32, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED));
+    TreeDeviceSchemaCacheManager.getInstance()
+        .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath);
+
+    final String[] measurements = new String[] {measurementId};
+    final TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32};
+    final MeasurementSchema[] measurementSchemas =
+        new MeasurementSchema[] {
+          new MeasurementSchema(measurementId, TSDataType.INT32, 
TSEncoding.PLAIN)
+        };
+    final long[] times = new long[] {1, 2};
+    final Object[] columns = new Object[] {new int[] {10, 20}};
+    final InsertTabletNode insertTabletNode =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.cache_tablet"),
+            false,
+            measurements,
+            dataTypes,
+            measurementSchemas,
+            times,
+            null,
+            columns,
+            times.length);
+
+    try {
+      dataRegion1.insertTablet(insertTabletNode);
+      Assert.fail("Expected BatchProcessException");
+    } catch (BatchProcessException e) {
+      final TimeValuePair lastCache =
+          
TreeDeviceSchemaCacheManager.getInstance().getLastCache(lastCachePath);
+      Assert.assertNotNull(lastCache);
+      Assert.assertEquals(1, lastCache.getTimestamp());
+      Assert.assertEquals(10, lastCache.getValue().getInt());
+    } finally {
+      dataRegion1.syncDeleteDataFiles();
+      COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+    }
+  }
+
   @Test
   public void testSmallReportProportionInsertRow()
       throws WriteProcessException,
@@ -1667,6 +1868,32 @@ public class DataRegionTest {
     }
   }
 
+  private interface TsFileProcessorSupplier {
+    TsFileProcessor get(long timePartitionId, boolean sequence) throws 
WriteProcessException;
+  }
+
+  private static class HookedDataRegion extends DummyDataRegion {
+    private TsFileProcessorSupplier tsFileProcessorSupplier;
+
+    private HookedDataRegion(String systemInfoDir, String storageGroupName)
+        throws DataRegionException {
+      super(systemInfoDir, storageGroupName);
+    }
+
+    private void setTsFileProcessorSupplier(TsFileProcessorSupplier 
tsFileProcessorSupplier) {
+      this.tsFileProcessorSupplier = tsFileProcessorSupplier;
+    }
+
+    @Override
+    protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, 
boolean sequence)
+        throws WriteProcessException {
+      if (tsFileProcessorSupplier != null) {
+        return tsFileProcessorSupplier.get(timeRangeId, sequence);
+      }
+      return super.getOrCreateTsFileProcessor(timeRangeId, sequence);
+    }
+  }
+
   // -- test for deleting data directly
   // -- delete data and file only when:
   // 1. tsfile is closed


Reply via email to