This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b5f54b9ff86 Fix IoTConsensusV2 receiver writer borrow race (#17495)
b5f54b9ff86 is described below
commit b5f54b9ff8678d262e37925c6660f7bc3f509c7c
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 17 10:09:43 2026 +0800
Fix IoTConsensusV2 receiver writer borrow race (#17495)
* Fix IoTConsensusV2 receiver writer borrow race
* Keep IoTConsensusV2 writer borrow fast path
* fix data race
---
.../iotconsensusv2/IoTConsensusV2Receiver.java | 58 +++++++++++-----------
1 file changed, 28 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index adf80654445..e0ef8e4072c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -1001,47 +1001,45 @@ public class IoTConsensusV2Receiver {
return tsFileWriter.orElse(null);
}
- @SuppressWarnings("java:S3655")
public IoTConsensusV2TsFileWriter borrowCorrespondingWriter(TCommitId
commitId) {
- Optional<IoTConsensusV2TsFileWriter> tsFileWriter =
+ final Optional<IoTConsensusV2TsFileWriter> correspondingWriter =
iotConsensusV2TsFileWriterPool.stream()
.filter(
item ->
item.isUsed()
&& Objects.equals(commitId,
item.getCommitIdOfCorrespondingHolderEvent()))
.findFirst();
+ if (correspondingWriter.isPresent()) {
+ return correspondingWriter.get().refreshLastUsedTs();
+ }
- // If the TsFileInsertionEvent is first using tsFileWriter, we will find
the first available
- // buffer for it.
- if (!tsFileWriter.isPresent()) {
- // We should synchronously find the idle writer to avoid concurrency
issues.
- lock.lock();
- try {
- // We need to check tsFileWriter.isPresent() here. Since there may
be both retry-sent
- // tsfile
- // events and real-time-sent tsfile events, causing the receiver's
tsFileWriter load to
- // exceed IOTDB_CONFIG.getIoTConsensusV2PipelineSize().
- while (!tsFileWriter.isPresent()) {
- tsFileWriter =
- iotConsensusV2TsFileWriterPool.stream().filter(item ->
!item.isUsed()).findFirst();
- condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
+ lock.lock();
+ try {
+ while (true) {
+ final Optional<IoTConsensusV2TsFileWriter> idleWriter =
+ iotConsensusV2TsFileWriterPool.stream().filter(item ->
!item.isUsed()).findFirst();
+ if (idleWriter.isPresent()) {
+ final IoTConsensusV2TsFileWriter writer = idleWriter.get();
+ // Publish commitId before marking the writer as used so lock-free
lookup callers
+ // observing isUsed=true can always see the bound commitId as well.
+ writer.setCommitIdOfCorrespondingHolderEvent(commitId);
+ writer.setUsed(true);
+ return writer.refreshLastUsedTs();
}
- tsFileWriter.get().setUsed(true);
- tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- final String errorStr =
- String.format(
- "IoTConsensusV2%s: receiver thread get interrupted when
waiting for borrowing tsFileWriter.",
- consensusPipeName);
- LOGGER.warn(errorStr);
- throw new RuntimeException(errorStr);
- } finally {
- lock.unlock();
+
+ condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
}
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ final String errorStr =
+ String.format(
+ "IoTConsensusV2%s: receiver thread get interrupted when
waiting for borrowing tsFileWriter.",
+ consensusPipeName);
+ LOGGER.warn(errorStr);
+ throw new RuntimeException(errorStr);
+ } finally {
+ lock.unlock();
}
-
- return tsFileWriter.get().refreshLastUsedTs();
}
private void checkZombieTsFileWriter() {