This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 55c193115cd Pipe: Add fsync option for file receiver to avoid data
region unexpected behavior after system restart (#13984) (#13993)
55c193115cd is described below
commit 55c193115cd7264435db24f0a9838a7f2e10a7c0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Nov 5 18:20:49 2024 +0800
Pipe: Add fsync option for file receiver to avoid data region unexpected
behavior after system restart (#13984) (#13993)
(cherry picked from commit b2cda441caaaddf576e38a5791e8391aacf34ebb)
---
.../pipeconsensus/PipeConsensusReceiver.java | 20 +++++++++++++----
.../apache/iotdb/commons/conf/CommonConfig.java | 10 +++++++++
.../iotdb/commons/conf/CommonDescriptor.java | 5 +++++
.../iotdb/commons/pipe/config/PipeConfig.java | 5 +++++
.../commons/pipe/receiver/IoTDBFileReceiver.java | 26 ++++++++++++++++++----
5 files changed, 58 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index bf9adca25d7..6162a7dd448 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -408,6 +408,10 @@ public class PipeConsensusReceiver {
final String fileAbsolutePath = writingFile.getAbsolutePath();
+ // Sync here is necessary to ensure that the data is written to the
disk. Or data region may
+ // load the file before the data is written to the disk and cause
unexpected behavior after
+ // system restart. (e.g., empty file in data region's data directory)
+ writingFileWriter.getFD().sync();
// 1. The writing file writer must be closed, otherwise it may cause
concurrent errors during
// the process of loading tsfile when parsing tsfile.
//
@@ -473,7 +477,7 @@ public class PipeConsensusReceiver {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod (if exists) should be
retransmitted by the
// sender.
- closeCurrentWritingFileWriter(tsFileWriter);
+ closeCurrentWritingFileWriter(tsFileWriter, false);
deleteCurrentWritingFile(tsFileWriter);
}
}
@@ -524,6 +528,10 @@ public class PipeConsensusReceiver {
}
}
+ // Sync here is necessary to ensure that the data is written to the
disk. Or data region may
+ // load the file before the data is written to the disk and cause
unexpected behavior after
+ // system restart. (e.g., empty file in data region's data directory)
+ writingFileWriter.getFD().sync();
// 1. The writing file writer must be closed, otherwise it may cause
concurrent errors during
// the process of loading tsfile when parsing tsfile.
//
@@ -582,7 +590,7 @@ public class PipeConsensusReceiver {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod(if exists) should be
retransmitted by the
// sender.
- closeCurrentWritingFileWriter(tsFileWriter);
+ closeCurrentWritingFileWriter(tsFileWriter, false);
// Clear the directory instead of only deleting the referenced files in
seal request
// to avoid previously undeleted file being redundant when transferring
multi files
IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());
@@ -777,10 +785,14 @@ public class PipeConsensusReceiver {
return !offsetCorrect;
}
- private void closeCurrentWritingFileWriter(PipeConsensusTsFileWriter
tsFileWriter) {
+ private void closeCurrentWritingFileWriter(
+ PipeConsensusTsFileWriter tsFileWriter, boolean fsyncAfterClose) {
if (tsFileWriter.getWritingFileWriter() != null) {
try {
tsFileWriter.getWritingFileWriter().close();
+ if (fsyncAfterClose) {
+ tsFileWriter.getWritingFileWriter().getFD().sync();
+ }
LOGGER.info(
"PipeConsensus-PipeName-{}: Current writing file writer {} was
closed.",
consensusPipeName,
@@ -861,7 +873,7 @@ public class PipeConsensusReceiver {
fileName,
tsFileWriter.getWritingFile() == null ? "null" :
tsFileWriter.getWritingFile().getPath());
- closeCurrentWritingFileWriter(tsFileWriter);
+ closeCurrentWritingFileWriter(tsFileWriter, !isSingleFile);
// If there are multiple files we can not delete the current file
// instead they will be deleted after seal request
if (tsFileWriter.getWritingFile() != null && isSingleFile) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b3162ee7567..c8b8129546f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -193,6 +193,8 @@ public class CommonConfig {
private boolean pipeHardLinkWALEnabled = false;
+ private boolean pipeFileReceiverFsyncEnabled = true;
+
private int pipeRealTimeQueuePollHistoryThreshold = 100;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
@@ -672,6 +674,14 @@ public class CommonConfig {
this.pipeHardLinkWALEnabled = pipeHardLinkWALEnabled;
}
+ public boolean getPipeFileReceiverFsyncEnabled() {
+ return pipeFileReceiverFsyncEnabled;
+ }
+
+ public void setPipeFileReceiverFsyncEnabled(boolean
pipeFileReceiverFsyncEnabled) {
+ this.pipeFileReceiverFsyncEnabled = pipeFileReceiverFsyncEnabled;
+ }
+
public int getPipeDataStructureTabletRowSize() {
return pipeDataStructureTabletRowSize;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index accbdc89895..4d2799d178b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -266,6 +266,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_hardlink_wal_enabled",
Boolean.toString(config.getPipeHardLinkWALEnabled()))));
+ config.setPipeFileReceiverFsyncEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_file_receiver_fsync_enabled",
+ Boolean.toString(config.getPipeFileReceiverFsyncEnabled()))));
config.setPipeDataStructureTabletRowSize(
Integer.parseInt(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index c41841264fa..9cb99a226ba 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -54,6 +54,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeHardLinkWALEnabled();
}
+ public boolean getPipeFileReceiverFsyncEnabled() {
+ return COMMON_CONFIG.getPipeFileReceiverFsyncEnabled();
+ }
+
/////////////////////////////// Tablet ///////////////////////////////
public int getPipeDataStructureTabletRowSize() {
@@ -335,6 +339,7 @@ public class PipeConfig {
LOGGER.info("PipeHardlinkTsFileDirName: {}",
getPipeHardlinkTsFileDirName());
LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
+ LOGGER.info("PipeFileReceiverFsyncEnabled: {}",
getPipeFileReceiverFsyncEnabled());
LOGGER.info("PipeDataStructureTabletRowSize: {}",
getPipeDataStructureTabletRowSize());
LOGGER.info("PipeDataStructureTabletSizeInBytes: {}",
getPipeDataStructureTabletSizeInBytes());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 59bcc06c45c..14bd34ebea8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
@@ -72,6 +73,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+ private static final boolean IS_FSYNC_ENABLED =
+ PipeConfig.getInstance().getPipeFileReceiverFsyncEnabled();
private File writingFile;
private RandomAccessFile writingFileWriter;
@@ -337,7 +340,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
fileName,
writingFile == null ? "null" : writingFile.getPath());
- closeCurrentWritingFileWriter();
+ closeCurrentWritingFileWriter(!isSingleFile);
// If there are multiple files we can not delete the current file
// instead they will be deleted after seal request
if (writingFile != null && isSingleFile) {
@@ -372,9 +375,12 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
return writingFile != null && writingFile.getName().equals(fileName);
}
- private void closeCurrentWritingFileWriter() {
+ private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) {
if (writingFileWriter != null) {
try {
+ if (IS_FSYNC_ENABLED && fsyncAfterClose) {
+ writingFileWriter.getFD().sync();
+ }
writingFileWriter.close();
LOGGER.info(
"Receiver id = {}: Current writing file writer {} was closed.",
@@ -467,6 +473,12 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
final String fileAbsolutePath = writingFile.getAbsolutePath();
+ // Sync here is necessary to ensure that the data is written to the
disk. Or data region may
+ // load the file before the data is written to the disk and cause
unexpected behavior after
+ // system restart. (e.g., empty file in data region's data directory)
+ if (IS_FSYNC_ENABLED) {
+ writingFileWriter.getFD().sync();
+ }
// 1. The writing file writer must be closed, otherwise it may cause
concurrent errors during
// the process of loading tsfile when parsing tsfile.
//
@@ -508,7 +520,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod (if exists) should be
retransmitted by the
// sender.
- closeCurrentWritingFileWriter();
+ closeCurrentWritingFileWriter(false);
deleteCurrentWritingFile();
}
}
@@ -543,6 +555,12 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
}
+ // Sync here is necessary to ensure that the data is written to the
disk. Or data region may
+ // load the file before the data is written to the disk and cause
unexpected behavior after
+ // system restart. (e.g., empty file in data region's data directory)
+ if (IS_FSYNC_ENABLED) {
+ writingFileWriter.getFD().sync();
+ }
// 1. The writing file writer must be closed, otherwise it may cause
concurrent errors during
// the process of loading tsfile when parsing tsfile.
//
@@ -583,7 +601,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod(if exists) should be
retransmitted by the
// sender.
- closeCurrentWritingFileWriter();
+ closeCurrentWritingFileWriter(false);
// Clear the directory instead of only deleting the referenced files in
seal request
// to avoid previously undeleted file being redundant when transferring
multi files
IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());