This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d82dffdbc8e IoTConsensusV2: Fix npe when data region is not existed 
(#14180)
d82dffdbc8e is described below

commit d82dffdbc8ece5589ccf9154c7fdedb37fbfde9f
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Nov 22 22:05:49 2024 +0800

    IoTConsensusV2: Fix npe when data region is not existed (#14180)
---
 .../pipeconsensus/PipeConsensusReceiver.java       | 34 +++++++++++++++-------
 1 file changed, 24 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 13cfdbbdf49..1e1ae84f3a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -88,6 +88,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils.generateTsFileResource;
+
 public class PipeConsensusReceiver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusReceiver.class);
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
@@ -653,9 +655,19 @@ public class PipeConsensusReceiver {
 
   private TSStatus loadFileToDataRegion(String filePath, ProgressIndex 
progressIndex)
       throws IOException, LoadFileException {
-    StorageEngine.getInstance()
-        .getDataRegion(((DataRegionId) consensusGroupId))
-        .loadNewTsFile(generateTsFileResource(filePath, progressIndex), true, 
false);
+    DataRegion region =
+        StorageEngine.getInstance().getDataRegion(((DataRegionId) 
consensusGroupId));
+    if (region != null) {
+      TsFileResource resource = generateTsFileResource(filePath, 
progressIndex);
+      region.loadNewTsFile(resource, true, false);
+    } else {
+      // Data region is null indicates that dr has been removed or migrated. 
In those cases, there
+      // is no need to replicate data. we just return success to avoid leader 
keeping retry
+      LOGGER.info(
+          "PipeConsensus-PipeName-{}: skip load tsfile-{} when sealing, 
because this region has been removed or migrated.",
+          consensusPipeName,
+          filePath);
+    }
     return RpcUtils.SUCCESS_STATUS;
   }
 
@@ -689,14 +701,16 @@ public class PipeConsensusReceiver {
   }
 
   private void updateWritePointCountMetrics(long writePointCount) {
-    final DataRegion dataRegion =
-        StorageEngine.getInstance().getDataRegion(((DataRegionId) 
consensusGroupId));
-    dataRegion
-        .getNonSystemDatabaseName()
+    Optional.ofNullable(
+            StorageEngine.getInstance().getDataRegion(((DataRegionId) 
consensusGroupId)))
         .ifPresent(
-            databaseName ->
-                LoadTsFileManager.updateWritePointCountMetrics(
-                    dataRegion, databaseName, writePointCount, true));
+            dataRegion ->
+                dataRegion
+                    .getNonSystemDatabaseName()
+                    .ifPresent(
+                        databaseName ->
+                            LoadTsFileManager.updateWritePointCountMetrics(
+                                dataRegion, databaseName, writePointCount, 
true)));
   }
 
   private TsFileResource generateTsFileResource(String filePath, ProgressIndex 
progressIndex)

Reply via email to