This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-garbage-cleaning
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-garbage-cleaning by this 
push:
     new 52359b34b1f fix
52359b34b1f is described below

commit 52359b34b1f1d9b77cd65d96f02847bcf77c5e48
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 18:32:22 2026 +0800

    fix
---
 .../iotconsensusv2/IoTConsensusV2Receiver.java     | 25 ++++++++++++++++------
 .../IoTConsensusV2ReceiverAgent.java               |  2 +-
 2 files changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index 77eb1d0a6fc..adf80654445 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -555,6 +555,15 @@ public class IoTConsensusV2Receiver {
         }
       }
 
+      if (req.getFileNames().size() < 2) {
+        return new TIoTConsensusV2TransferResp(
+            RpcUtils.getStatus(
+                TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
+                String.format(
+                    "Failed to seal file %s, because the number of files is 
less than 2.",
+                    req.getFileNames())));
+      }
+
       // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
       // load the file before the data is written to the disk and cause 
unexpected behavior after
       // system restart. (e.g., empty file in data region's data directory)
@@ -952,6 +961,7 @@ public class IoTConsensusV2Receiver {
 
   private class IoTConsensusV2TsFileWriterPool {
     private final Lock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
     private final List<IoTConsensusV2TsFileWriter> 
iotConsensusV2TsFileWriterPool =
         new ArrayList<>();
     private final ConsensusPipeName consensusPipeName;
@@ -1014,15 +1024,18 @@ public class IoTConsensusV2Receiver {
           while (!tsFileWriter.isPresent()) {
             tsFileWriter =
                 iotConsensusV2TsFileWriterPool.stream().filter(item -> 
!item.isUsed()).findFirst();
-            Thread.sleep(RETRY_WAIT_TIME);
+            condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
           }
           tsFileWriter.get().setUsed(true);
           tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
           Thread.currentThread().interrupt();
-          LOGGER.warn(
-              "IoTConsensusV2{}: receiver thread get interrupted when waiting 
for borrowing tsFileWriter.",
-              consensusPipeName);
+          final String errorStr =
+              String.format(
+                  "IoTConsensusV2%s: receiver thread get interrupted when 
waiting for borrowing tsFileWriter.",
+                  consensusPipeName);
+          LOGGER.warn(errorStr);
+          throw new RuntimeException(errorStr);
         } finally {
           lock.unlock();
         }
@@ -1057,7 +1070,7 @@ public class IoTConsensusV2Receiver {
                 && tsFileWriter.isUsed()) {
               try {
                 Thread.sleep(RETRY_WAIT_TIME);
-              } catch (InterruptedException e) {
+              } catch (final InterruptedException e) {
                 Thread.currentThread().interrupt();
                 LOGGER.warn(
                     "IoTConsensusV2-PipeName-{}: receiver thread get 
interrupted when exiting.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
index 481d8dd7370..be2659c1cf4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
@@ -243,7 +243,7 @@ public class IoTConsensusV2ReceiverAgent implements 
ConsensusPipeReceiver {
                 ConsensusPipeName consensusPipeName = receiverEntry.getKey();
                 AtomicReference<IoTConsensusV2Receiver> receiverReference =
                     receiverEntry.getValue();
-                if (receiverReference != null) {
+                if (receiverReference != null && receiverReference.get() != 
null) {
                   receiverReference.get().handleExit();
                   receiverReference.set(null);
                 }

Reply via email to