This is an automated email from the ASF dual-hosted git repository. pengjunzhi pushed a commit to branch fixFileWriter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8fba211c8f05f044776782d0d56f5f9f7c325b41 Author: 彭俊植 <[email protected]> AuthorDate: Wed Aug 13 19:19:56 2025 +0800 fix RandomAccessFileWriter --- .../pipeconsensus/PipeConsensusReceiver.java | 93 ++++++++++------------ 1 file changed, 43 insertions(+), 50 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 168e6406745..96018319bea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -168,6 +168,9 @@ public class PipeConsensusReceiver { // will reject synchronization. TPipeConsensusTransferResp resp = preCheckForReceiver(req); if (resp != null) { + // release tsFileWriter if pre-check failed as leader will resend the whole tsFileEvent + releaseTsFileWriter( + pipeConsensusTsFileWriterPool.tryToFindCorrespondingWriter(req.getCommitId()), false); return resp; } @@ -400,20 +403,8 @@ public class PipeConsensusReceiver { } finally { // Exception may occur when disk system go wrong. At this time, we may reset all resource // and receive this file from scratch when leader will try to resend this file from - // scratch - // as well. - closeCurrentWritingFileWriter(tsFileWriter, false); - deleteCurrentWritingFile(tsFileWriter); - // must return tsfileWriter after deleting its file. - try { - tsFileWriter.returnSelf(consensusPipeName); - } catch (IOException | DiskSpaceInsufficientException returnException) { - LOGGER.warn( - "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", - consensusPipeName, - tsFileWriter, - returnException); - } + // scratch as well. + releaseTsFileWriter(tsFileWriter, false); } } } finally { @@ -432,7 +423,6 @@ public class PipeConsensusReceiver { File writingFile = tsFileWriter.getWritingFile(); RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter(); - boolean isReturnTsFileWriter = false; try { if (isWritingFileNonAvailable(tsFileWriter)) { final TSStatus status = @@ -481,8 +471,6 @@ public class PipeConsensusReceiver { pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() - endPreCheckNanos); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // if transfer success, disk buffer will be released. - isReturnTsFileWriter = true; LOGGER.info( "PipeConsensus-PipeName-{}: Seal file {} successfully.", consensusPipeName, @@ -521,20 +509,7 @@ public class PipeConsensusReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod (if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(tsFileWriter, false); - deleteCurrentWritingFile(tsFileWriter); - // must return tsfileWriter after deleting its file. - if (isReturnTsFileWriter) { - try { - tsFileWriter.returnSelf(consensusPipeName); - } catch (IOException | DiskSpaceInsufficientException e) { - LOGGER.warn( - "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", - consensusPipeName, - tsFileWriter, - e); - } - } + releaseTsFileWriter(tsFileWriter, false); } } @@ -557,7 +532,6 @@ public class PipeConsensusReceiver { req.getFileNames().stream() .map(fileName -> new File(currentWritingDirPath, fileName)) .collect(Collectors.toList()); - boolean isReturnTsFileWriter = false; try { if (isWritingFileNonAvailable(tsFileWriter)) { final TSStatus status = @@ -620,8 +594,6 @@ public class PipeConsensusReceiver { pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() - endPreCheckNanos); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // if transfer success, disk buffer will be released. - isReturnTsFileWriter = true; LOGGER.info( "PipeConsensus-PipeName-{}: Seal file with mods {} successfully.", consensusPipeName, @@ -649,22 +621,28 @@ public class PipeConsensusReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod(if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(tsFileWriter, false); + releaseTsFileWriter(tsFileWriter, false); // Clear the directory instead of only deleting the referenced files in seal request // to avoid previously undeleted file being redundant when transferring multi files IoTDBReceiverAgent.cleanPipeReceiverDir(currentWritingDirPath); - // must return tsfileWriter after deleting its file. - if (isReturnTsFileWriter) { - try { - tsFileWriter.returnSelf(consensusPipeName); - } catch (IOException | DiskSpaceInsufficientException e) { - LOGGER.warn( - "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", - consensusPipeName, - tsFileWriter, - e); - } - } + } + } + + private void releaseTsFileWriter( + PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) { + if (tsFileWriter == null) { + return; + } + closeCurrentWritingFileWriter(tsFileWriter, fsyncBeforeClose); + deleteCurrentWritingFile(tsFileWriter); + try { + tsFileWriter.returnSelf(consensusPipeName); + } catch (IOException | DiskSpaceInsufficientException e) { + LOGGER.warn( + "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.", + consensusPipeName, + tsFileWriter, + e); } } @@ -882,6 +860,7 @@ public class PipeConsensusReceiver { tsFileWriter.getWritingFile() == null ? "null" : tsFileWriter.getWritingFile().getPath()); + tsFileWriter.setWritingFileWriter(null); } catch (IOException e) { LOGGER.warn( "PipeConsensus-PipeName-{}: Failed to close current writing file writer {}, because {}.", @@ -892,7 +871,6 @@ public class PipeConsensusReceiver { e.getMessage(), e); } - tsFileWriter.setWritingFileWriter(null); } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug( @@ -1107,6 +1085,17 @@ public class PipeConsensusReceiver { IOTDB_CONFIG.getTsFileWriterCheckInterval()); } + public PipeConsensusTsFileWriter tryToFindCorrespondingWriter(TCommitId commitId) { + Optional<PipeConsensusTsFileWriter> tsFileWriter = + pipeConsensusTsFileWriterPool.stream() + .filter( + item -> + item.isUsed() + && Objects.equals(commitId, item.getCommitIdOfCorrespondingHolderEvent())) + .findFirst(); + return tsFileWriter.orElse(null); + } + @SuppressWarnings("java:S3655") public PipeConsensusTsFileWriter borrowCorrespondingWriter(TCommitId commitId) { Optional<PipeConsensusTsFileWriter> tsFileWriter = @@ -1291,13 +1280,17 @@ public class PipeConsensusReceiver { return writingFileWriter; } - public void setWritingFileWriter(RandomAccessFile writingFileWriter) { + public void setWritingFileWriter(RandomAccessFile writingFileWriter) throws IOException { this.writingFileWriter = writingFileWriter; if (writingFileWriter == null) { LOGGER.info( "PipeConsensus-{}: TsFileWriter-{} set null writing file writer", consensusPipeName.toString(), index); + } else { + // seek to the end of the file to ensure that the next piece will be appended to the end of + // the file. + this.writingFileWriter.seek(this.writingFileWriter.length()); } } @@ -1353,6 +1346,7 @@ public class PipeConsensusReceiver { "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file writer was closed.", consensusPipeName.toString(), index); + setWritingFileWriter(null); } catch (Exception e) { LOGGER.warn( "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Close Writing file writer error.", @@ -1360,7 +1354,6 @@ public class PipeConsensusReceiver { index, e); } - setWritingFileWriter(null); } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug(
