This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 8ebfcb99883 Load: convert to insert tablet on region replica set 
changes (#14717) (#14833)
8ebfcb99883 is described below

commit 8ebfcb99883be1f9f61b56d0b7c9056d54b01414
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Feb 14 18:00:45 2025 +0800

    Load: convert to insert tablet on region replica set changes (#14717) 
(#14833)
    
    * Load: convert to insert tablet on region replica set changes (#14717)
    
    (cherry picked from commit 7ac71fb2a33fdaae4fe56804ce0a2c48640dbed8)
    
    * resolve
---
 .../plan/analyze/LoadTsFileAnalyzer.java           |  7 ++++
 .../plan/node/load/LoadTsFilePieceNode.java        |  6 +--
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 47 +++++++++-------------
 .../iotdb/db/storageengine/StorageEngine.java      | 23 ++++++++++-
 ...eeStatementDataTypeConvertExecutionVisitor.java |  4 ++
 5 files changed, 53 insertions(+), 34 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 5604b687068..30cd8c23389 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -236,9 +236,16 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
             : null;
 
     if (status == null) {
+      LOGGER.warn(
+          "Load: Failed to convert to tablets from statement {}. Status is 
null.",
+          loadTsFileStatement);
       analysis.setFailStatus(
           new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
     } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
+      LOGGER.warn(
+          "Load: Failed to convert to tablets from statement {}. Status: {}",
+          loadTsFileStatement,
+          status);
       analysis.setFailStatus(status);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index 5d4f02378fc..f578148e014 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -155,9 +155,9 @@ public class LoadTsFilePieceNode extends WritePlanNode {
     InputStream stream = new ByteArrayInputStream(buffer.array());
     try {
       ReadWriteIOUtils.readShort(stream); // read PlanNodeType
-      File tsFile = new File(ReadWriteIOUtils.readString(stream));
-      LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new 
PlanNodeId(""), tsFile);
-      int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
+      final File tsFile = new File(ReadWriteIOUtils.readString(stream));
+      final LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new 
PlanNodeId(""), tsFile);
+      final int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
       for (int i = 0; i < tsFileDataSize; i++) {
         TsFileData tsFileData = TsFileData.deserialize(stream);
         pieceNode.addTsFileData(tsFileData);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index be3c9345f80..eaa23a59cf2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -90,6 +90,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -204,8 +205,7 @@ public class LoadTsFileScheduler implements IScheduler {
             long startTime = System.nanoTime();
             final boolean isFirstPhaseSuccess;
             try {
-              isFirstPhaseSuccess =
-                  firstPhaseWithRetry(node, 
CONFIG.getLoadTsFileRetryCountOnRegionChange());
+              isFirstPhaseSuccess = firstPhase(node);
             } finally {
               LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
                   LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() - 
startTime);
@@ -259,9 +259,23 @@ public class LoadTsFileScheduler implements IScheduler {
       if (isLoadSuccess) {
         stateMachine.transitionToFinished();
       } else {
+        final StringBuilder failedTsFiles =
+            new StringBuilder(
+                !tsFileNodeList.isEmpty()
+                    ? tsFileNodeList.get(0).getTsFileResource().getTsFilePath()
+                    : "");
+        final ListIterator<Integer> iterator = 
failedTsFileNodeIndexes.listIterator(1);
+        while (iterator.hasNext()) {
+          failedTsFiles
+              .append(", ")
+              
.append(tsFileNodeList.get(iterator.next()).getTsFileResource().getTsFilePath());
+        }
         final long startTime = System.nanoTime();
         try {
           // if failed to load some TsFiles, then try to convert the TsFiles 
to Tablets
+          LOGGER.info(
+              "Load TsFile(s) failed, will try to convert to tablets and 
insert. Failed TsFiles: {}",
+              failedTsFiles);
           convertFailedTsFilesToTabletsAndRetry();
         } finally {
           LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
@@ -273,30 +287,7 @@ public class LoadTsFileScheduler implements IScheduler {
     }
   }
 
-  private boolean firstPhaseWithRetry(LoadSingleTsFileNode node, int 
retryCountOnRegionChange) {
-    retryCountOnRegionChange = Math.max(0, retryCountOnRegionChange);
-    while (true) {
-      try {
-        return firstPhase(node);
-      } catch (RegionReplicaSetChangedException e) {
-        if (retryCountOnRegionChange > 0) {
-          LOGGER.warn(
-              "Region replica set changed during loading TsFile {}, maybe due 
to region migration, will retry for {} times.",
-              node.getTsFileResource(),
-              retryCountOnRegionChange);
-          retryCountOnRegionChange--;
-        } else {
-          stateMachine.transitionToFailed(e);
-          LOGGER.warn(
-              "Region replica set changed during loading TsFile {} after 
retry.",
-              node.getTsFileResource());
-          return false;
-        }
-      }
-    }
-  }
-
-  private boolean firstPhase(LoadSingleTsFileNode node) throws 
RegionReplicaSetChangedException {
+  private boolean firstPhase(LoadSingleTsFileNode node) {
     final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, 
node, block);
     try {
       new TsFileSplitter(
@@ -306,8 +297,6 @@ public class LoadTsFileScheduler implements IScheduler {
         stateMachine.transitionToFailed(new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
         return false;
       }
-    } catch (RegionReplicaSetChangedException e) {
-      throw e;
     } catch (IllegalStateException e) {
       stateMachine.transitionToFailed(e);
       LOGGER.warn(
@@ -657,7 +646,7 @@ public class LoadTsFileScheduler implements IScheduler {
 
           dataSize -= pieceNode.getDataSize();
           block.reduceMemoryUsage(pieceNode.getDataSize());
-          regionId2ReplicaSetAndNode.put(
+          regionId2ReplicaSetAndNode.replace(
               sortedRegionId,
               new Pair<>(
                   replicaSet,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index cc0b852e3b0..d88703b5b0c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -904,10 +904,20 @@ public class StorageEngine implements IService {
 
     LoadTsFileRateLimiter.getInstance().acquire(pieceNode.getDataSize());
 
+    final DataRegion dataRegion = getDataRegion(dataRegionId);
+    if (dataRegion == null) {
+      LOGGER.warn(
+          "DataRegion {} not found on this DataNode when writing piece node"
+              + "of TsFile {} (maybe due to region migration), will skip.",
+          dataRegionId,
+          pieceNode.getTsFile());
+      return RpcUtils.SUCCESS_STATUS;
+    }
+
     try {
-      loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId), 
pieceNode, uuid);
+      loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
     } catch (IOException e) {
-      LOGGER.error(
+      LOGGER.warn(
           "IO error when writing piece node of TsFile {} to DataRegion {}.",
           pieceNode.getTsFile(),
           dataRegionId,
@@ -915,6 +925,15 @@ public class StorageEngine implements IService {
       status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
       status.setMessage(e.getMessage());
       return status;
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Exception occurred when writing piece node of TsFile {} to 
DataRegion {}.",
+          pieceNode.getTsFile(),
+          dataRegionId,
+          e);
+      status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+      status.setMessage(e.getMessage());
+      return status;
     }
 
     return RpcUtils.SUCCESS_STATUS;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index 222fdb5d93f..d37316a2908 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -109,6 +109,10 @@ public class 
LoadTreeStatementDataTypeConvertExecutionVisitor
               || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
               || result.getCode()
                   == 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
+            LOGGER.warn(
+                "Failed to convert data type for LoadTsFileStatement: {}, 
status code is {}.",
+                loadTsFileStatement,
+                result.getCode());
             return Optional.empty();
           }
         }

Reply via email to