This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b5f54b9ff86 Fix IoTConsensusV2 receiver writer borrow race (#17495)
b5f54b9ff86 is described below

commit b5f54b9ff8678d262e37925c6660f7bc3f509c7c
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 17 10:09:43 2026 +0800

    Fix IoTConsensusV2 receiver writer borrow race (#17495)
    
    * Fix IoTConsensusV2 receiver writer borrow race
    
    * Keep IoTConsensusV2 writer borrow fast path
    
    * fix data race
---
 .../iotconsensusv2/IoTConsensusV2Receiver.java     | 58 +++++++++++-----------
 1 file changed, 28 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index adf80654445..e0ef8e4072c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -1001,47 +1001,45 @@ public class IoTConsensusV2Receiver {
       return tsFileWriter.orElse(null);
     }
 
-    @SuppressWarnings("java:S3655")
     public IoTConsensusV2TsFileWriter borrowCorrespondingWriter(TCommitId 
commitId) {
-      Optional<IoTConsensusV2TsFileWriter> tsFileWriter =
+      final Optional<IoTConsensusV2TsFileWriter> correspondingWriter =
           iotConsensusV2TsFileWriterPool.stream()
               .filter(
                   item ->
                       item.isUsed()
                           && Objects.equals(commitId, 
item.getCommitIdOfCorrespondingHolderEvent()))
               .findFirst();
+      if (correspondingWriter.isPresent()) {
+        return correspondingWriter.get().refreshLastUsedTs();
+      }
 
-      // If the TsFileInsertionEvent is first using tsFileWriter, we will find 
the first available
-      // buffer for it.
-      if (!tsFileWriter.isPresent()) {
-        // We should synchronously find the idle writer to avoid concurrency 
issues.
-        lock.lock();
-        try {
-          // We need to check tsFileWriter.isPresent() here. Since there may 
be both retry-sent
-          // tsfile
-          // events and real-time-sent tsfile events, causing the receiver's 
tsFileWriter load to
-          // exceed IOTDB_CONFIG.getIoTConsensusV2PipelineSize().
-          while (!tsFileWriter.isPresent()) {
-            tsFileWriter =
-                iotConsensusV2TsFileWriterPool.stream().filter(item -> 
!item.isUsed()).findFirst();
-            condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
+      lock.lock();
+      try {
+        while (true) {
+          final Optional<IoTConsensusV2TsFileWriter> idleWriter =
+              iotConsensusV2TsFileWriterPool.stream().filter(item -> 
!item.isUsed()).findFirst();
+          if (idleWriter.isPresent()) {
+            final IoTConsensusV2TsFileWriter writer = idleWriter.get();
+            // Publish commitId before marking the writer as used so lock-free 
lookup callers
+            // observing isUsed=true can always see the bound commitId as well.
+            writer.setCommitIdOfCorrespondingHolderEvent(commitId);
+            writer.setUsed(true);
+            return writer.refreshLastUsedTs();
           }
-          tsFileWriter.get().setUsed(true);
-          tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
-        } catch (final InterruptedException e) {
-          Thread.currentThread().interrupt();
-          final String errorStr =
-              String.format(
-                  "IoTConsensusV2%s: receiver thread get interrupted when 
waiting for borrowing tsFileWriter.",
-                  consensusPipeName);
-          LOGGER.warn(errorStr);
-          throw new RuntimeException(errorStr);
-        } finally {
-          lock.unlock();
+
+          condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
         }
+      } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
+        final String errorStr =
+            String.format(
+                "IoTConsensusV2%s: receiver thread get interrupted when 
waiting for borrowing tsFileWriter.",
+                consensusPipeName);
+        LOGGER.warn(errorStr);
+        throw new RuntimeException(errorStr);
+      } finally {
+        lock.unlock();
       }
-
-      return tsFileWriter.get().refreshLastUsedTs();
     }
 
     private void checkZombieTsFileWriter() {

Reply via email to