This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch 13984 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 22ffa5fadeaf8ddaa4caebf35633e844b2b56515 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Nov 5 14:44:57 2024 +0800 Pipe: Add fsync option for file receiver to avoid data region unexpected behavior after system restart (#13984) (cherry picked from commit b2cda441caaaddf576e38a5791e8391aacf34ebb) --- .../pipeconsensus/PipeConsensusReceiver.java | 20 +++++++++++++---- .../apache/iotdb/commons/conf/CommonConfig.java | 10 +++++++++ .../iotdb/commons/conf/CommonDescriptor.java | 5 +++++ .../iotdb/commons/pipe/config/PipeConfig.java | 5 +++++ .../commons/pipe/receiver/IoTDBFileReceiver.java | 26 ++++++++++++++++++---- 5 files changed, 58 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index bf9adca25d7..6162a7dd448 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -408,6 +408,10 @@ public class PipeConsensusReceiver { final String fileAbsolutePath = writingFile.getAbsolutePath(); + // Sync here is necessary to ensure that the data is written to the disk. Or data region may + // load the file before the data is written to the disk and cause unexpected behavior after + // system restart. (e.g., empty file in data region's data directory) + writingFileWriter.getFD().sync(); // 1. The writing file writer must be closed, otherwise it may cause concurrent errors during // the process of loading tsfile when parsing tsfile. // @@ -473,7 +477,7 @@ public class PipeConsensusReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod (if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(tsFileWriter); + closeCurrentWritingFileWriter(tsFileWriter, false); deleteCurrentWritingFile(tsFileWriter); } } @@ -524,6 +528,10 @@ public class PipeConsensusReceiver { } } + // Sync here is necessary to ensure that the data is written to the disk. Or data region may + // load the file before the data is written to the disk and cause unexpected behavior after + // system restart. (e.g., empty file in data region's data directory) + writingFileWriter.getFD().sync(); // 1. The writing file writer must be closed, otherwise it may cause concurrent errors during // the process of loading tsfile when parsing tsfile. // @@ -582,7 +590,7 @@ public class PipeConsensusReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod(if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(tsFileWriter); + closeCurrentWritingFileWriter(tsFileWriter, false); // Clear the directory instead of only deleting the referenced files in seal request // to avoid previously undeleted file being redundant when transferring multi files IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get()); @@ -777,10 +785,14 @@ public class PipeConsensusReceiver { return !offsetCorrect; } - private void closeCurrentWritingFileWriter(PipeConsensusTsFileWriter tsFileWriter) { + private void closeCurrentWritingFileWriter( + PipeConsensusTsFileWriter tsFileWriter, boolean fsyncAfterClose) { if (tsFileWriter.getWritingFileWriter() != null) { try { tsFileWriter.getWritingFileWriter().close(); + if (fsyncAfterClose) { + tsFileWriter.getWritingFileWriter().getFD().sync(); + } LOGGER.info( "PipeConsensus-PipeName-{}: Current writing file writer {} was closed.", consensusPipeName, @@ -861,7 +873,7 @@ public class PipeConsensusReceiver { fileName, tsFileWriter.getWritingFile() == null ? "null" : tsFileWriter.getWritingFile().getPath()); - closeCurrentWritingFileWriter(tsFileWriter); + closeCurrentWritingFileWriter(tsFileWriter, !isSingleFile); // If there are multiple files we can not delete the current file // instead they will be deleted after seal request if (tsFileWriter.getWritingFile() != null && isSingleFile) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index b3162ee7567..c8b8129546f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -193,6 +193,8 @@ public class CommonConfig { private boolean pipeHardLinkWALEnabled = false; + private boolean pipeFileReceiverFsyncEnabled = true; + private int pipeRealTimeQueuePollHistoryThreshold = 100; /** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */ @@ -672,6 +674,14 @@ public class CommonConfig { this.pipeHardLinkWALEnabled = pipeHardLinkWALEnabled; } + public boolean getPipeFileReceiverFsyncEnabled() { + return pipeFileReceiverFsyncEnabled; + } + + public void setPipeFileReceiverFsyncEnabled(boolean pipeFileReceiverFsyncEnabled) { + this.pipeFileReceiverFsyncEnabled = pipeFileReceiverFsyncEnabled; + } + public int getPipeDataStructureTabletRowSize() { return pipeDataStructureTabletRowSize; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index accbdc89895..4d2799d178b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -266,6 +266,11 @@ public class CommonDescriptor { properties.getProperty( "pipe_hardlink_wal_enabled", Boolean.toString(config.getPipeHardLinkWALEnabled())))); + config.setPipeFileReceiverFsyncEnabled( + Boolean.parseBoolean( + properties.getProperty( + "pipe_file_receiver_fsync_enabled", + Boolean.toString(config.getPipeFileReceiverFsyncEnabled())))); config.setPipeDataStructureTabletRowSize( Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index c41841264fa..9cb99a226ba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -54,6 +54,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeHardLinkWALEnabled(); } + public boolean getPipeFileReceiverFsyncEnabled() { + return COMMON_CONFIG.getPipeFileReceiverFsyncEnabled(); + } + /////////////////////////////// Tablet /////////////////////////////// public int getPipeDataStructureTabletRowSize() { @@ -335,6 +339,7 @@ public class PipeConfig { LOGGER.info("PipeHardlinkTsFileDirName: {}", getPipeHardlinkTsFileDirName()); LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName()); LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled()); + LOGGER.info("PipeFileReceiverFsyncEnabled: {}", getPipeFileReceiverFsyncEnabled()); LOGGER.info("PipeDataStructureTabletRowSize: {}", getPipeDataStructureTabletRowSize()); LOGGER.info("PipeDataStructureTabletSizeInBytes: {}", getPipeDataStructureTabletSizeInBytes()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 59bcc06c45c..14bd34ebea8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant; import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; @@ -72,6 +73,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE; protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; + private static final boolean IS_FSYNC_ENABLED = + PipeConfig.getInstance().getPipeFileReceiverFsyncEnabled(); private File writingFile; private RandomAccessFile writingFileWriter; @@ -337,7 +340,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { fileName, writingFile == null ? "null" : writingFile.getPath()); - closeCurrentWritingFileWriter(); + closeCurrentWritingFileWriter(!isSingleFile); // If there are multiple files we can not delete the current file // instead they will be deleted after seal request if (writingFile != null && isSingleFile) { @@ -372,9 +375,12 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { return writingFile != null && writingFile.getName().equals(fileName); } - private void closeCurrentWritingFileWriter() { + private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) { if (writingFileWriter != null) { try { + if (IS_FSYNC_ENABLED && fsyncAfterClose) { + writingFileWriter.getFD().sync(); + } writingFileWriter.close(); LOGGER.info( "Receiver id = {}: Current writing file writer {} was closed.", @@ -467,6 +473,12 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { final String fileAbsolutePath = writingFile.getAbsolutePath(); + // Sync here is necessary to ensure that the data is written to the disk. Or data region may + // load the file before the data is written to the disk and cause unexpected behavior after + // system restart. (e.g., empty file in data region's data directory) + if (IS_FSYNC_ENABLED) { + writingFileWriter.getFD().sync(); + } // 1. The writing file writer must be closed, otherwise it may cause concurrent errors during // the process of loading tsfile when parsing tsfile. // @@ -508,7 +520,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod (if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(); + closeCurrentWritingFileWriter(false); deleteCurrentWritingFile(); } } @@ -543,6 +555,12 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } } + // Sync here is necessary to ensure that the data is written to the disk. Or data region may + // load the file before the data is written to the disk and cause unexpected behavior after + // system restart. (e.g., empty file in data region's data directory) + if (IS_FSYNC_ENABLED) { + writingFileWriter.getFD().sync(); + } // 1. The writing file writer must be closed, otherwise it may cause concurrent errors during // the process of loading tsfile when parsing tsfile. // @@ -583,7 +601,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod(if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(); + closeCurrentWritingFileWriter(false); // Clear the directory instead of only deleting the referenced files in seal request // to avoid previously undeleted file being redundant when transferring multi files IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());
