This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-garbage-cleaning
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-garbage-cleaning by this
push:
new 52359b34b1f fix
52359b34b1f is described below
commit 52359b34b1f1d9b77cd65d96f02847bcf77c5e48
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 18:32:22 2026 +0800
fix
---
.../iotconsensusv2/IoTConsensusV2Receiver.java | 25 ++++++++++++++++------
.../IoTConsensusV2ReceiverAgent.java | 2 +-
2 files changed, 20 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index 77eb1d0a6fc..adf80654445 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -555,6 +555,15 @@ public class IoTConsensusV2Receiver {
}
}
+ if (req.getFileNames().size() < 2) {
+ return new TIoTConsensusV2TransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
+ String.format(
+ "Failed to seal file %s, because the number of files is
less than 2.",
+ req.getFileNames())));
+ }
+
// Sync here is necessary to ensure that the data is written to the
disk. Or data region may
// load the file before the data is written to the disk and cause
unexpected behavior after
// system restart. (e.g., empty file in data region's data directory)
@@ -952,6 +961,7 @@ public class IoTConsensusV2Receiver {
private class IoTConsensusV2TsFileWriterPool {
private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
private final List<IoTConsensusV2TsFileWriter>
iotConsensusV2TsFileWriterPool =
new ArrayList<>();
private final ConsensusPipeName consensusPipeName;
@@ -1014,15 +1024,18 @@ public class IoTConsensusV2Receiver {
while (!tsFileWriter.isPresent()) {
tsFileWriter =
iotConsensusV2TsFileWriterPool.stream().filter(item ->
!item.isUsed()).findFirst();
- Thread.sleep(RETRY_WAIT_TIME);
+ condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
}
tsFileWriter.get().setUsed(true);
tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
- LOGGER.warn(
- "IoTConsensusV2{}: receiver thread get interrupted when waiting
for borrowing tsFileWriter.",
- consensusPipeName);
+ final String errorStr =
+ String.format(
+ "IoTConsensusV2%s: receiver thread get interrupted when
waiting for borrowing tsFileWriter.",
+ consensusPipeName);
+ LOGGER.warn(errorStr);
+ throw new RuntimeException(errorStr);
} finally {
lock.unlock();
}
@@ -1057,7 +1070,7 @@ public class IoTConsensusV2Receiver {
&& tsFileWriter.isUsed()) {
try {
Thread.sleep(RETRY_WAIT_TIME);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(
"IoTConsensusV2-PipeName-{}: receiver thread get
interrupted when exiting.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
index 481d8dd7370..be2659c1cf4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
@@ -243,7 +243,7 @@ public class IoTConsensusV2ReceiverAgent implements
ConsensusPipeReceiver {
ConsensusPipeName consensusPipeName = receiverEntry.getKey();
AtomicReference<IoTConsensusV2Receiver> receiverReference =
receiverEntry.getValue();
- if (receiverReference != null) {
+ if (receiverReference != null && receiverReference.get() !=
null) {
receiverReference.get().handleExit();
receiverReference.set(null);
}