This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 8ebfcb99883 Load: convert to insert tablet on region replica set
changes (#14717) (#14833)
8ebfcb99883 is described below
commit 8ebfcb99883be1f9f61b56d0b7c9056d54b01414
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Feb 14 18:00:45 2025 +0800
Load: convert to insert tablet on region replica set changes (#14717)
(#14833)
* Load: convert to insert tablet on region replica set changes (#14717)
(cherry picked from commit 7ac71fb2a33fdaae4fe56804ce0a2c48640dbed8)
* resolve
---
.../plan/analyze/LoadTsFileAnalyzer.java | 7 ++++
.../plan/node/load/LoadTsFilePieceNode.java | 6 +--
.../plan/scheduler/load/LoadTsFileScheduler.java | 47 +++++++++-------------
.../iotdb/db/storageengine/StorageEngine.java | 23 ++++++++++-
...eeStatementDataTypeConvertExecutionVisitor.java | 4 ++
5 files changed, 53 insertions(+), 34 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 5604b687068..30cd8c23389 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -236,9 +236,16 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
: null;
if (status == null) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from statement {}. Status is
null.",
+ loadTsFileStatement);
analysis.setFailStatus(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
} else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from statement {}. Status: {}",
+ loadTsFileStatement,
+ status);
analysis.setFailStatus(status);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index 5d4f02378fc..f578148e014 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -155,9 +155,9 @@ public class LoadTsFilePieceNode extends WritePlanNode {
InputStream stream = new ByteArrayInputStream(buffer.array());
try {
ReadWriteIOUtils.readShort(stream); // read PlanNodeType
- File tsFile = new File(ReadWriteIOUtils.readString(stream));
- LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new
PlanNodeId(""), tsFile);
- int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
+ final File tsFile = new File(ReadWriteIOUtils.readString(stream));
+ final LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new
PlanNodeId(""), tsFile);
+ final int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
for (int i = 0; i < tsFileDataSize; i++) {
TsFileData tsFileData = TsFileData.deserialize(stream);
pieceNode.addTsFileData(tsFileData);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index be3c9345f80..eaa23a59cf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -90,6 +90,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -204,8 +205,7 @@ public class LoadTsFileScheduler implements IScheduler {
long startTime = System.nanoTime();
final boolean isFirstPhaseSuccess;
try {
- isFirstPhaseSuccess =
- firstPhaseWithRetry(node,
CONFIG.getLoadTsFileRetryCountOnRegionChange());
+ isFirstPhaseSuccess = firstPhase(node);
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() -
startTime);
@@ -259,9 +259,23 @@ public class LoadTsFileScheduler implements IScheduler {
if (isLoadSuccess) {
stateMachine.transitionToFinished();
} else {
+ final StringBuilder failedTsFiles =
+ new StringBuilder(
+ !tsFileNodeList.isEmpty()
+ ? tsFileNodeList.get(0).getTsFileResource().getTsFilePath()
+ : "");
+ final ListIterator<Integer> iterator =
failedTsFileNodeIndexes.listIterator(1);
+ while (iterator.hasNext()) {
+ failedTsFiles
+ .append(", ")
+
.append(tsFileNodeList.get(iterator.next()).getTsFileResource().getTsFilePath());
+ }
final long startTime = System.nanoTime();
try {
// if failed to load some TsFiles, then try to convert the TsFiles
to Tablets
+ LOGGER.info(
+ "Load TsFile(s) failed, will try to convert to tablets and
insert. Failed TsFiles: {}",
+ failedTsFiles);
convertFailedTsFilesToTabletsAndRetry();
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
@@ -273,30 +287,7 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
- private boolean firstPhaseWithRetry(LoadSingleTsFileNode node, int
retryCountOnRegionChange) {
- retryCountOnRegionChange = Math.max(0, retryCountOnRegionChange);
- while (true) {
- try {
- return firstPhase(node);
- } catch (RegionReplicaSetChangedException e) {
- if (retryCountOnRegionChange > 0) {
- LOGGER.warn(
- "Region replica set changed during loading TsFile {}, maybe due
to region migration, will retry for {} times.",
- node.getTsFileResource(),
- retryCountOnRegionChange);
- retryCountOnRegionChange--;
- } else {
- stateMachine.transitionToFailed(e);
- LOGGER.warn(
- "Region replica set changed during loading TsFile {} after
retry.",
- node.getTsFileResource());
- return false;
- }
- }
- }
- }
-
- private boolean firstPhase(LoadSingleTsFileNode node) throws
RegionReplicaSetChangedException {
+ private boolean firstPhase(LoadSingleTsFileNode node) {
final TsFileDataManager tsFileDataManager = new TsFileDataManager(this,
node, block);
try {
new TsFileSplitter(
@@ -306,8 +297,6 @@ public class LoadTsFileScheduler implements IScheduler {
stateMachine.transitionToFailed(new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
return false;
}
- } catch (RegionReplicaSetChangedException e) {
- throw e;
} catch (IllegalStateException e) {
stateMachine.transitionToFailed(e);
LOGGER.warn(
@@ -657,7 +646,7 @@ public class LoadTsFileScheduler implements IScheduler {
dataSize -= pieceNode.getDataSize();
block.reduceMemoryUsage(pieceNode.getDataSize());
- regionId2ReplicaSetAndNode.put(
+ regionId2ReplicaSetAndNode.replace(
sortedRegionId,
new Pair<>(
replicaSet,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index cc0b852e3b0..d88703b5b0c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -904,10 +904,20 @@ public class StorageEngine implements IService {
LoadTsFileRateLimiter.getInstance().acquire(pieceNode.getDataSize());
+ final DataRegion dataRegion = getDataRegion(dataRegionId);
+ if (dataRegion == null) {
+ LOGGER.warn(
+ "DataRegion {} not found on this DataNode when writing piece node"
+ + "of TsFile {} (maybe due to region migration), will skip.",
+ dataRegionId,
+ pieceNode.getTsFile());
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
try {
- loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId),
pieceNode, uuid);
+ loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
} catch (IOException e) {
- LOGGER.error(
+ LOGGER.warn(
"IO error when writing piece node of TsFile {} to DataRegion {}.",
pieceNode.getTsFile(),
dataRegionId,
@@ -915,6 +925,15 @@ public class StorageEngine implements IService {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(e.getMessage());
return status;
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Exception occurred when writing piece node of TsFile {} to
DataRegion {}.",
+ pieceNode.getTsFile(),
+ dataRegionId,
+ e);
+ status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+ status.setMessage(e.getMessage());
+ return status;
}
return RpcUtils.SUCCESS_STATUS;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index 222fdb5d93f..d37316a2908 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -109,6 +109,10 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitor
|| result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| result.getCode()
==
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
+ LOGGER.warn(
+ "Failed to convert data type for LoadTsFileStatement: {},
status code is {}.",
+ loadTsFileStatement,
+ result.getCode());
return Optional.empty();
}
}