This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch insert-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/insert-fix by this push:
new 404ea0c0837 insert-fix
404ea0c0837 is described below
commit 404ea0c0837296ced2f09eab05c232600eeada40
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 28 16:52:04 2026 +0800
insert-fix
---
.../planner/plan/node/write/InsertTabletNode.java | 60 ++++-
.../node/write/RelationalInsertTabletNode.java | 6 +-
.../db/storageengine/dataregion/DataRegion.java | 275 +++++++++++++--------
.../storageengine/dataregion/DataRegionTest.java | 227 +++++++++++++++++
4 files changed, 458 insertions(+), 110 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 490f16ca5f0..995e8a95e3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -1193,36 +1193,72 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
if (lastIdx < startOffset) {
return null;
}
+ return composeTimeValuePair(measurementIndex, lastIdx);
+ }
+
+ public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+ return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+ }
+
+ protected TimeValuePair composeLastTimeValuePair(
+ final int measurementIndex,
+ final TSStatus[] results,
+ final int startOffset,
+ final int endOffset) {
+ if (results == null) {
+ return composeLastTimeValuePair(measurementIndex, startOffset,
endOffset);
+ }
+ if (measurementIndex >= columns.length ||
Objects.isNull(dataTypes[measurementIndex])) {
+ return null;
+ }
+
+ final BitMap bitMap = bitMaps == null ? null : bitMaps[measurementIndex];
+ int lastIdx = Math.min(endOffset - 1, rowCount - 1);
+ while (lastIdx >= startOffset) {
+ if (results[lastIdx] != null
+ && results[lastIdx].getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ lastIdx--;
+ continue;
+ }
+ if (bitMap != null && bitMap.isMarked(lastIdx)) {
+ lastIdx--;
+ continue;
+ }
+ break;
+ }
+ return lastIdx < startOffset ? null :
composeTimeValuePair(measurementIndex, lastIdx);
+ }
+ private TimeValuePair composeTimeValuePair(final int measurementIndex, final
int rowIndex) {
TsPrimitiveType value;
switch (dataTypes[measurementIndex]) {
case INT32:
case DATE:
int[] intValues = (int[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+ value = new TsPrimitiveType.TsInt(intValues[rowIndex]);
break;
case INT64:
case TIMESTAMP:
long[] longValues = (long[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+ value = new TsPrimitiveType.TsLong(longValues[rowIndex]);
break;
case FLOAT:
float[] floatValues = (float[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+ value = new TsPrimitiveType.TsFloat(floatValues[rowIndex]);
break;
case DOUBLE:
double[] doubleValues = (double[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+ value = new TsPrimitiveType.TsDouble(doubleValues[rowIndex]);
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+ value = new TsPrimitiveType.TsBoolean(boolValues[rowIndex]);
break;
case TEXT:
case BLOB:
case STRING:
Binary[] binaryValues = (Binary[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+ value = new TsPrimitiveType.TsBinary(binaryValues[rowIndex]);
break;
case OBJECT:
return null;
@@ -1230,11 +1266,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
throw new UnSupportedDataTypeException(
String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
}
- return new TimeValuePair(times[lastIdx], value);
- }
-
- public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
- return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+ return new TimeValuePair(times[rowIndex], value);
}
public IDeviceID getDeviceID(int rowIdx) {
@@ -1313,10 +1345,14 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
public void updateLastCache(final String databaseName) {
+ updateLastCache(databaseName, null);
+ }
+
+ public void updateLastCache(final String databaseName, final TSStatus[]
results) {
final String[] rawMeasurements = getRawMeasurements();
final TimeValuePair[] timeValuePairs = new
TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
- timeValuePairs[i] = composeLastTimeValuePair(i);
+ timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount);
}
TreeDeviceSchemaCacheManager.getInstance()
.updateLastCacheIfExists(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index b9f7edbe87c..8d24ad77364 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -371,6 +371,10 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
@Override
public void updateLastCache(final String databaseName) {
+ updateLastCache(databaseName, null);
+ }
+
+ public void updateLastCache(final String databaseName, final TSStatus[]
results) {
final String[] rawMeasurements = getRawMeasurements();
final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
splitByDevice(0, rowCount);
@@ -381,7 +385,7 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
final TimeValuePair[] timeValuePairs = new
TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
- timeValuePairs[i] = composeLastTimeValuePair(i, startOffset,
endOffset);
+ timeValuePairs[i] = composeLastTimeValuePair(i, results, startOffset,
endOffset);
}
TableDeviceSchemaCache.getInstance()
.updateLastCacheIfExists(databaseName, deviceID, rawMeasurements,
timeValuePairs);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 933efbae84c..e5d75b4a210 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1455,7 +1455,7 @@ public class DataRegion implements IDataRegionForQuery {
&& !insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
// disable updating last cache on follower
long startTime = System.nanoTime();
- tryToUpdateInsertTabletLastCache(insertTabletNode);
+ tryToUpdateInsertTabletLastCache(insertTabletNode, results);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
}
return noFailure;
@@ -1506,18 +1506,12 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
- TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
- if (tsFileProcessor == null) {
- for (int[] rangePair : rangeList) {
- int start = rangePair[0];
- int end = rangePair[1];
- for (int i = start; i < end; i++) {
- results[i] =
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR,
- "can not create TsFileProcessor, timePartitionId: " +
timePartitionId);
- }
- }
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
+ } catch (WriteProcessException e) {
+ markInsertTabletRangesFailed(
+ rangeList, results, RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
return false;
}
@@ -1546,6 +1540,15 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
+ private void markInsertTabletRangesFailed(
+ final List<int[]> rangeList, final TSStatus[] results, final TSStatus
failureStatus) {
+ for (int[] rangePair : rangeList) {
+ for (int i = rangePair[0]; i < rangePair[1]; i++) {
+ results[i] = failureStatus;
+ }
+ }
+ }
+
private TableSchema getTableSchemaFromCache(
final String database, final String tableName, final Pair<Long, Long>
currentVersion) {
final TableSchemaCacheKey key = new TableSchemaCacheKey(database,
tableName);
@@ -1679,6 +1682,11 @@ public class DataRegion implements IDataRegionForQuery {
node.updateLastCache(getDatabaseName());
}
+ private void tryToUpdateInsertTabletLastCache(
+ final InsertTabletNode node, final TSStatus[] results) {
+ node.updateLastCache(getDatabaseName(), results);
+ }
+
private TsFileProcessor insertToTsFileProcessor(
InsertRowNode insertRowNode, boolean sequence, long timePartitionId)
throws WriteProcessException {
@@ -1686,19 +1694,16 @@ public class DataRegion implements IDataRegionForQuery {
return null;
}
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
- if (tsFileProcessor == null) {
- return null;
- }
long[] infoForMetrics = new long[5];
// infoForMetrics[0]: CreateMemtableBlockTimeCost
// infoForMetrics[1]: ScheduleMemoryBlockTimeCost
// infoForMetrics[2]: ScheduleWalTimeCost
// infoForMetrics[3]: ScheduleMemTableTimeCost
// infoForMetrics[4]: InsertedPointsNumber
- tsFileProcessor.insert(insertRowNode, infoForMetrics);
- updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
// register TableSchema (and maybe more) for table insertion
registerToTsFile(insertRowNode, tsFileProcessor);
+ tsFileProcessor.insert(insertRowNode, infoForMetrics);
+ updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
return tsFileProcessor;
}
@@ -1717,9 +1722,11 @@ public class DataRegion implements IDataRegionForQuery {
if (insertRowNode.allMeasurementFailed()) {
continue;
}
- TsFileProcessor tsFileProcessor =
- getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
- if (tsFileProcessor == null) {
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i],
areSequence[i]);
+ } catch (WriteProcessException e) {
+ insertRowsNode.getResults().put(i,
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
continue;
}
int finalI = i;
@@ -1727,78 +1734,156 @@ public class DataRegion implements IDataRegionForQuery
{
tsFileProcessor,
(k, v) -> {
if (v == null) {
- v = insertRowsNode.emptyClone();
- v.setSearchIndex(insertRowNode.getSearchIndex());
- v.setAligned(insertRowNode.isAligned());
- if (insertRowNode.isGeneratedByPipe()) {
- v.markAsGeneratedByPipe();
- }
- if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- v.markAsGeneratedByRemoteConsensusLeader();
- }
+ v = createGroupedInsertRowsNode(insertRowsNode, insertRowNode);
}
- if (v.isAligned() != insertRowNode.isAligned()) {
- v.setMixingAlignment(true);
- }
- v.addOneInsertRowNode(insertRowNode, finalI);
- v.updateProgressIndex(insertRowNode.getProgressIndex());
+ appendInsertRowNode(v, insertRowNode, finalI);
return v;
});
}
List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
- TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
try {
- tsFileProcessor =
- insertRowsWithTypeConsistencyCheck(tsFileProcessor,
subInsertRowsNode, infoForMetrics);
+ List<TsFileProcessor> insertedProcessors =
+ insertRowsWithTypeConsistencyCheck(entry.getKey(),
subInsertRowsNode, infoForMetrics);
+
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+ for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+ // check memtable size and may asyncTryToFlush the work memtable
+ if (tsFileProcessor.shouldFlush()) {
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ }
+ }
} catch (WriteProcessException e) {
- insertRowsNode
- .getResults()
- .put(
- subInsertRowsNode.getInsertRowNodeIndexList().get(0),
- RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- }
-
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
- // check memtable size and may asyncTryToFlush the work memtable
- if (entry.getKey().shouldFlush()) {
- fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ recordInsertRowsFailure(insertRowsNode, subInsertRowsNode, e);
}
}
return executedInsertRowNodeList;
}
- private TsFileProcessor insertRowsWithTypeConsistencyCheck(
+ private List<TsFileProcessor> insertRowsWithTypeConsistencyCheck(
TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode,
long[] infoForMetrics)
throws WriteProcessException {
try {
// register TableSchema (and maybe more) for table insertion
registerToTsFile(subInsertRowsNode, tsFileProcessor);
tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+ return Collections.singletonList(tsFileProcessor);
} catch (DataTypeInconsistentException e) {
InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0);
long timePartitionId =
TimePartitionUtils.getTimePartitionId(firstRow.getTime());
// flush both MemTables so that the new type can be inserted into a new
MemTable
- TsFileProcessor workSequenceProcessor =
workSequenceTsFileProcessors.get(timePartitionId);
- if (workSequenceProcessor != null) {
- fileFlushPolicy.apply(this, workSequenceProcessor,
workSequenceProcessor.isSequence());
- }
- TsFileProcessor workUnsequenceProcessor =
workUnsequenceTsFileProcessors.get(timePartitionId);
- if (workUnsequenceProcessor != null) {
- fileFlushPolicy.apply(this, workUnsequenceProcessor,
workUnsequenceProcessor.isSequence());
- }
+ flushWorkingProcessorsForTimePartition(timePartitionId);
+ return retryInsertRowsAfterFlush(subInsertRowsNode, timePartitionId,
infoForMetrics);
+ }
+ }
- boolean isSequence =
+ private InsertRowsNode createGroupedInsertRowsNode(
+ final InsertRowsNode sourceInsertRowsNode, final InsertRowNode
firstInsertRowNode) {
+ final InsertRowsNode groupedInsertRowsNode =
sourceInsertRowsNode.emptyClone();
+ initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+ return groupedInsertRowsNode;
+ }
+
+ private InsertRowsNode createGroupedInsertRowsNode(
+ final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+ final InsertRowNode firstInsertRowNode) {
+ final InsertRowsNode groupedInsertRowsNode =
+ new InsertRowsNode(sourceInsertRowsNode.getPlanNodeId());
+ initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+ return groupedInsertRowsNode;
+ }
+
+ private void initializeGroupedInsertRowsNode(
+ final InsertRowsNode groupedInsertRowsNode, final InsertRowNode
firstInsertRowNode) {
+ groupedInsertRowsNode.setSearchIndex(firstInsertRowNode.getSearchIndex());
+ groupedInsertRowsNode.setAligned(firstInsertRowNode.isAligned());
+ if (firstInsertRowNode.isGeneratedByPipe()) {
+ groupedInsertRowsNode.markAsGeneratedByPipe();
+ }
+ if (firstInsertRowNode.isGeneratedByRemoteConsensusLeader()) {
+ groupedInsertRowsNode.markAsGeneratedByRemoteConsensusLeader();
+ }
+ }
+
+ private void appendInsertRowNode(
+ final InsertRowsNode groupedInsertRowsNode,
+ final InsertRowNode insertRowNode,
+ final int insertRowNodeIndex) {
+ if (groupedInsertRowsNode.isAligned() != insertRowNode.isAligned()) {
+ groupedInsertRowsNode.setMixingAlignment(true);
+ }
+ groupedInsertRowsNode.addOneInsertRowNode(insertRowNode,
insertRowNodeIndex);
+
groupedInsertRowsNode.updateProgressIndex(insertRowNode.getProgressIndex());
+ }
+
+ private void flushWorkingProcessorsForTimePartition(final long
timePartitionId) {
+ TsFileProcessor workSequenceProcessor =
workSequenceTsFileProcessors.get(timePartitionId);
+ if (workSequenceProcessor != null) {
+ fileFlushPolicy.apply(this, workSequenceProcessor,
workSequenceProcessor.isSequence());
+ }
+ TsFileProcessor workUnsequenceProcessor =
workUnsequenceTsFileProcessors.get(timePartitionId);
+ if (workUnsequenceProcessor != null) {
+ fileFlushPolicy.apply(this, workUnsequenceProcessor,
workUnsequenceProcessor.isSequence());
+ }
+ }
+
+ private List<TsFileProcessor> retryInsertRowsAfterFlush(
+ final InsertRowsNode subInsertRowsNode,
+ final long timePartitionId,
+ final long[] infoForMetrics)
+ throws WriteProcessException {
+ final Map<TsFileProcessor, InsertRowsNode> retriedProcessorMap = new
HashMap<>();
+ for (int i = 0; i < subInsertRowsNode.getInsertRowNodeList().size(); i++) {
+ final InsertRowNode insertRowNode =
subInsertRowsNode.getInsertRowNodeList().get(i);
+ final boolean isSequence =
config.isEnableSeparateData()
- && firstRow.getTime()
- > lastFlushTimeMap.getFlushedTime(timePartitionId,
firstRow.getDeviceID());
- tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId,
isSequence);
- registerToTsFile(subInsertRowsNode, tsFileProcessor);
- tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+ && insertRowNode.getTime()
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ final TsFileProcessor retriedProcessor =
+ getOrCreateTsFileProcessor(timePartitionId, isSequence);
+ final int insertRowNodeIndex =
subInsertRowsNode.getInsertRowNodeIndexList().get(i);
+ retriedProcessorMap.compute(
+ retriedProcessor,
+ (k, v) -> {
+ if (v == null) {
+ v = createGroupedInsertRowsNode(subInsertRowsNode,
insertRowNode);
+ }
+ appendInsertRowNode(v, insertRowNode, insertRowNodeIndex);
+ return v;
+ });
+ }
+
+ final List<TsFileProcessor> insertedProcessors = new
ArrayList<>(retriedProcessorMap.size());
+ for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry :
retriedProcessorMap.entrySet()) {
+ final TsFileProcessor retriedProcessor = retriedEntry.getKey();
+ registerToTsFile(retriedEntry.getValue(), retriedProcessor);
+ retriedProcessor.insertRows(retriedEntry.getValue(), infoForMetrics);
+ insertedProcessors.add(retriedProcessor);
+ }
+ return insertedProcessors;
+ }
+
+ private void recordInsertRowsFailure(
+ final InsertRowsNode sourceInsertRowsNode,
+ final InsertRowsNode failedInsertRowsNode,
+ final WriteProcessException exception) {
+ final TSStatus failureStatus =
+ RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+ for (Integer failedIndex :
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+ sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
+ }
+ }
+
+ private void recordInsertRowsFailure(
+ final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+ final InsertRowsNode failedInsertRowsNode,
+ final WriteProcessException exception) {
+ final TSStatus failureStatus =
+ RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+ for (Integer failedIndex :
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+ sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
}
- return tsFileProcessor;
}
private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
@@ -1859,7 +1944,8 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean
sequence) {
+ protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId,
boolean sequence)
+ throws WriteProcessException {
TsFileProcessor tsFileProcessor = null;
int retryCnt = 0;
do {
@@ -1885,7 +1971,7 @@ public class DataRegion implements IDataRegionForQuery {
"disk space is insufficient when creating TsFile processor, change
system mode to read-only",
e);
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
- break;
+ throw new WriteProcessException(e.getMessage(), e.getErrorCode(),
true);
} catch (IOException e) {
if (retryCnt < 3) {
logger.warn("meet IOException when creating TsFileProcessor, retry
it again", e);
@@ -1894,11 +1980,15 @@ public class DataRegion implements IDataRegionForQuery {
logger.error(
"meet IOException when creating TsFileProcessor, change system
mode to error", e);
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
- break;
+ throw new WriteProcessException(
+ String.format(
+ "Failed to create TsFileProcessor for database %s,
timePartitionId %s",
+ databaseName, timeRangeId),
+ e);
}
} catch (ExceedQuotaException e) {
logger.error(e.getMessage());
- break;
+ throw new WriteProcessException(e.getMessage(), e.getErrorCode(),
true);
}
} while (tsFileProcessor == null);
return tsFileProcessor;
@@ -4490,8 +4580,13 @@ public class DataRegion implements IDataRegionForQuery {
config.isEnableSeparateData()
&& insertRowNode.getTime()
> lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
- TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, isSequence);
- if (tsFileProcessor == null) {
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId,
isSequence);
+ } catch (WriteProcessException e) {
+ insertRowsOfOneDeviceNode
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
continue;
}
int finalI = i;
@@ -4499,18 +4594,9 @@ public class DataRegion implements IDataRegionForQuery {
tsFileProcessor,
(k, v) -> {
if (v == null) {
- v = new
InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId());
- v.setSearchIndex(insertRowNode.getSearchIndex());
- v.setAligned(insertRowNode.isAligned());
- if (insertRowNode.isGeneratedByPipe()) {
- v.markAsGeneratedByPipe();
- }
- if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- v.markAsGeneratedByRemoteConsensusLeader();
- }
+ v = createGroupedInsertRowsNode(insertRowsOfOneDeviceNode,
insertRowNode);
}
- v.addOneInsertRowNode(insertRowNode, finalI);
- v.updateProgressIndex(insertRowNode.getProgressIndex());
+ appendInsertRowNode(v, insertRowNode, finalI);
return v;
});
}
@@ -4522,24 +4608,19 @@ public class DataRegion implements IDataRegionForQuery {
// infoForMetrics[3]: ScheduleMemTableTimeCost
// infoForMetrics[4]: InsertedPointsNumber
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
- TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
try {
- tsFileProcessor =
- insertRowsWithTypeConsistencyCheck(
- tsFileProcessor, subInsertRowsNode, infoForMetrics);
+ List<TsFileProcessor> insertedProcessors =
+ insertRowsWithTypeConsistencyCheck(entry.getKey(),
subInsertRowsNode, infoForMetrics);
+
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+ for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+ // check memtable size and may asyncTryToFlush the work memtable
+ if (tsFileProcessor.shouldFlush()) {
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ }
+ }
} catch (WriteProcessException e) {
- insertRowsOfOneDeviceNode
- .getResults()
- .put(
- subInsertRowsNode.getInsertRowNodeIndexList().get(0),
- RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- }
-
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
- // check memtable size and may asyncTryToFlush the work memtable
- if (tsFileProcessor.shouldFlush()) {
- fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ recordInsertRowsFailure(insertRowsOfOneDeviceNode,
subInsertRowsNode, e);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 68d76764920..19ac712882b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion;
import org.apache.iotdb.calc.exception.QueryProcessException;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -32,8 +33,10 @@ import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
@@ -46,6 +49,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -68,6 +72,8 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -84,6 +90,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,6 +107,9 @@ import java.util.concurrent.Future;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
public class DataRegionTest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -149,6 +159,7 @@ public class DataRegionTest {
dataRegion.syncDeleteDataFiles();
StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
}
+ TreeDeviceSchemaCacheManager.getInstance().cleanUp();
EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
CompactionTaskManager.getInstance().stop();
EnvironmentUtils.cleanEnv();
@@ -1064,6 +1075,196 @@ public class DataRegionTest {
dataRegion1.syncDeleteDataFiles();
}
+ @Test
+ public void testInsertRowPropagatesTsFileProcessorCreationFailure()
+ throws IllegalPathException, DataRegionException,
TsFileProcessorException {
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.fail_row");
+ dataRegion1.setTsFileProcessorSupplier(
+ (timePartitionId, sequence) -> {
+ throw new WriteProcessRejectException("mock creation failure");
+ });
+
+ final TSRecord record = new TSRecord("root.fail_row", 1);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1)));
+ final InsertRowNode insertRowNode = buildInsertRowNodeByTSRecord(record);
+
+ try {
+ dataRegion1.insert(insertRowNode);
+ Assert.fail("Expected WriteProcessRejectException");
+ } catch (WriteProcessRejectException e) {
+ Assert.assertTrue(e.getMessage().contains("mock creation failure"));
+ } catch (WriteProcessException e) {
+ Assert.fail("Expected WriteProcessRejectException but got " +
e.getClass().getSimpleName());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ }
+ }
+
+ @Test
+ public void testInsertRowsMarkAllFailedRowsForSameProcessor() throws
Exception {
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.fail_rows");
+ final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+ Mockito.doThrow(new WriteProcessException("mock insert rows failure"))
+ .when(processor)
+ .insertRows(any(InsertRowsNode.class), any(long[].class));
+ Mockito.when(processor.shouldFlush()).thenReturn(false);
+ Mockito.when(processor.isSequence()).thenReturn(true);
+ dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) ->
processor);
+
+ final List<Integer> indexList = Arrays.asList(0, 1);
+ final List<InsertRowNode> nodes = new ArrayList<>();
+ for (long time : new long[] {1, 2}) {
+ final TSRecord record = new TSRecord("root.fail_rows", time);
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(time)));
+ nodes.add(buildInsertRowNodeByTSRecord(record));
+ }
+ final InsertRowsNode insertRowsNode = new InsertRowsNode(new
PlanNodeId(""), indexList, nodes);
+
+ try {
+ dataRegion1.insert(insertRowsNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ Assert.assertEquals(2, insertRowsNode.getResults().size());
+ Assert.assertEquals(
+ TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+ insertRowsNode.getResults().get(0).getCode());
+ Assert.assertEquals(
+ TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+ insertRowsNode.getResults().get(1).getCode());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ }
+ }
+
+ @Test
+ public void testInsertRowsLastCacheSkipsFailedRows() throws Exception {
+ final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+ COMMON_CONFIG.setLastCacheEnable(true);
+
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.cache_rows");
+ final TsFileProcessor successProcessor =
Mockito.mock(TsFileProcessor.class);
+ Mockito.when(successProcessor.shouldFlush()).thenReturn(false);
+ Mockito.when(successProcessor.isSequence()).thenReturn(true);
+ final long failingTime = TimePartitionUtils.getTimePartitionInterval() + 1;
+ final long failingPartitionId =
TimePartitionUtils.getTimePartitionId(failingTime);
+ dataRegion1.setTsFileProcessorSupplier(
+ (timePartitionId, sequence) -> {
+ if (timePartitionId == failingPartitionId) {
+ throw new WriteProcessException("mock row failure");
+ }
+ return successProcessor;
+ });
+
+ final MeasurementPath lastCachePath =
+ new MeasurementPath(
+ "root.cache_rows",
+ measurementId,
+ new MeasurementSchema(
+ measurementId, TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED));
+ TreeDeviceSchemaCacheManager.getInstance()
+ .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath);
+
+ final List<Integer> indexList = Arrays.asList(0, 1);
+ final List<InsertRowNode> nodes = new ArrayList<>();
+ final long[] times = new long[] {1, failingTime};
+ final int[] values = new int[] {10, 20};
+ for (int i = 0; i < times.length; i++) {
+ final long time = times[i];
+ final TSRecord record = new TSRecord("root.cache_rows", time);
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(values[i])));
+ nodes.add(buildInsertRowNodeByTSRecord(record));
+ }
+ final InsertRowsNode insertRowsNode = new InsertRowsNode(new
PlanNodeId(""), indexList, nodes);
+
+ try {
+ dataRegion1.insert(insertRowsNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ final TimeValuePair lastCache =
+
TreeDeviceSchemaCacheManager.getInstance().getLastCache(lastCachePath);
+ Assert.assertNotNull(lastCache);
+ Assert.assertEquals(1, lastCache.getTimestamp());
+ Assert.assertEquals(10, lastCache.getValue().getInt());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+ }
+ }
+
+ @Test
+ public void testInsertTabletLastCacheSkipsFailedRows() throws Exception {
+ final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+ COMMON_CONFIG.setLastCacheEnable(true);
+
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.cache_tablet");
+ final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+ Mockito.doAnswer(
+ invocation -> {
+ TSStatus[] results = invocation.getArgument(2);
+ results[0] = RpcUtils.SUCCESS_STATUS;
+ results[1] =
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
"mock row failure");
+ throw new WriteProcessException("mock tablet failure");
+ })
+ .when(processor)
+ .insertTablet(
+ any(InsertTabletNode.class),
+ anyList(),
+ any(TSStatus[].class),
+ anyBoolean(),
+ any(long[].class));
+ Mockito.when(processor.shouldFlush()).thenReturn(false);
+ Mockito.when(processor.isSequence()).thenReturn(true);
+ dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) ->
processor);
+
+ final MeasurementPath lastCachePath =
+ new MeasurementPath(
+ "root.cache_tablet",
+ measurementId,
+ new MeasurementSchema(
+ measurementId, TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED));
+ TreeDeviceSchemaCacheManager.getInstance()
+ .declareLastCache(dataRegion1.getDatabaseName(), lastCachePath);
+
+ final String[] measurements = new String[] {measurementId};
+ final TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32};
+ final MeasurementSchema[] measurementSchemas =
+ new MeasurementSchema[] {
+ new MeasurementSchema(measurementId, TSDataType.INT32,
TSEncoding.PLAIN)
+ };
+ final long[] times = new long[] {1, 2};
+ final Object[] columns = new Object[] {new int[] {10, 20}};
+ final InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.cache_tablet"),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ null,
+ columns,
+ times.length);
+
+ try {
+ dataRegion1.insertTablet(insertTabletNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ final TimeValuePair lastCache =
+
TreeDeviceSchemaCacheManager.getInstance().getLastCache(lastCachePath);
+ Assert.assertNotNull(lastCache);
+ Assert.assertEquals(1, lastCache.getTimestamp());
+ Assert.assertEquals(10, lastCache.getValue().getInt());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+ }
+ }
+
@Test
public void testSmallReportProportionInsertRow()
throws WriteProcessException,
@@ -1667,6 +1868,32 @@ public class DataRegionTest {
}
}
+ private interface TsFileProcessorSupplier {
+ TsFileProcessor get(long timePartitionId, boolean sequence) throws
WriteProcessException;
+ }
+
+ private static class HookedDataRegion extends DummyDataRegion {
+ private TsFileProcessorSupplier tsFileProcessorSupplier;
+
+ private HookedDataRegion(String systemInfoDir, String storageGroupName)
+ throws DataRegionException {
+ super(systemInfoDir, storageGroupName);
+ }
+
+ private void setTsFileProcessorSupplier(TsFileProcessorSupplier
tsFileProcessorSupplier) {
+ this.tsFileProcessorSupplier = tsFileProcessorSupplier;
+ }
+
+ @Override
+ protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId,
boolean sequence)
+ throws WriteProcessException {
+ if (tsFileProcessorSupplier != null) {
+ return tsFileProcessorSupplier.get(timeRangeId, sequence);
+ }
+ return super.getOrCreateTsFileProcessor(timeRangeId, sequence);
+ }
+ }
+
// -- test for deleting data directly
// -- delete data and file only when:
// 1. tsfile is closed