This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 55c193115cd Pipe: Add fsync option for file receiver to avoid data 
region unexpected behavior after system restart (#13984) (#13993)
55c193115cd is described below

commit 55c193115cd7264435db24f0a9838a7f2e10a7c0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Nov 5 18:20:49 2024 +0800

    Pipe: Add fsync option for file receiver to avoid data region unexpected 
behavior after system restart (#13984) (#13993)
    
    (cherry picked from commit b2cda441caaaddf576e38a5791e8391aacf34ebb)
---
 .../pipeconsensus/PipeConsensusReceiver.java       | 20 +++++++++++++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 10 +++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 +++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  5 +++++
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 26 ++++++++++++++++++----
 5 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index bf9adca25d7..6162a7dd448 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -408,6 +408,10 @@ public class PipeConsensusReceiver {
 
       final String fileAbsolutePath = writingFile.getAbsolutePath();
 
+      // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
+      // load the file before the data is written to the disk and cause 
unexpected behavior after
+      // system restart. (e.g., empty file in data region's data directory)
+      writingFileWriter.getFD().sync();
       // 1. The writing file writer must be closed, otherwise it may cause 
concurrent errors during
       // the process of loading tsfile when parsing tsfile.
       //
@@ -473,7 +477,7 @@ public class PipeConsensusReceiver {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod (if exists) should be 
retransmitted by the
       // sender.
-      closeCurrentWritingFileWriter(tsFileWriter);
+      closeCurrentWritingFileWriter(tsFileWriter, false);
       deleteCurrentWritingFile(tsFileWriter);
     }
   }
@@ -524,6 +528,10 @@ public class PipeConsensusReceiver {
         }
       }
 
+      // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
+      // load the file before the data is written to the disk and cause 
unexpected behavior after
+      // system restart. (e.g., empty file in data region's data directory)
+      writingFileWriter.getFD().sync();
       // 1. The writing file writer must be closed, otherwise it may cause 
concurrent errors during
       // the process of loading tsfile when parsing tsfile.
       //
@@ -582,7 +590,7 @@ public class PipeConsensusReceiver {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
       // sender.
-      closeCurrentWritingFileWriter(tsFileWriter);
+      closeCurrentWritingFileWriter(tsFileWriter, false);
       // Clear the directory instead of only deleting the referenced files in 
seal request
       // to avoid previously undeleted file being redundant when transferring 
multi files
       
IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());
@@ -777,10 +785,14 @@ public class PipeConsensusReceiver {
     return !offsetCorrect;
   }
 
-  private void closeCurrentWritingFileWriter(PipeConsensusTsFileWriter 
tsFileWriter) {
+  private void closeCurrentWritingFileWriter(
+      PipeConsensusTsFileWriter tsFileWriter, boolean fsyncAfterClose) {
     if (tsFileWriter.getWritingFileWriter() != null) {
       try {
         tsFileWriter.getWritingFileWriter().close();
+        if (fsyncAfterClose) {
+          tsFileWriter.getWritingFileWriter().getFD().sync();
+        }
         LOGGER.info(
             "PipeConsensus-PipeName-{}: Current writing file writer {} was 
closed.",
             consensusPipeName,
@@ -861,7 +873,7 @@ public class PipeConsensusReceiver {
         fileName,
         tsFileWriter.getWritingFile() == null ? "null" : 
tsFileWriter.getWritingFile().getPath());
 
-    closeCurrentWritingFileWriter(tsFileWriter);
+    closeCurrentWritingFileWriter(tsFileWriter, !isSingleFile);
     // If there are multiple files we can not delete the current file
     // instead they will be deleted after seal request
     if (tsFileWriter.getWritingFile() != null && isSingleFile) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b3162ee7567..c8b8129546f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -193,6 +193,8 @@ public class CommonConfig {
 
   private boolean pipeHardLinkWALEnabled = false;
 
+  private boolean pipeFileReceiverFsyncEnabled = true;
+
   private int pipeRealTimeQueuePollHistoryThreshold = 100;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
@@ -672,6 +674,14 @@ public class CommonConfig {
     this.pipeHardLinkWALEnabled = pipeHardLinkWALEnabled;
   }
 
+  public boolean getPipeFileReceiverFsyncEnabled() {
+    return pipeFileReceiverFsyncEnabled;
+  }
+
+  public void setPipeFileReceiverFsyncEnabled(boolean 
pipeFileReceiverFsyncEnabled) {
+    this.pipeFileReceiverFsyncEnabled = pipeFileReceiverFsyncEnabled;
+  }
+
   public int getPipeDataStructureTabletRowSize() {
     return pipeDataStructureTabletRowSize;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index accbdc89895..4d2799d178b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -266,6 +266,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_hardlink_wal_enabled",
                 Boolean.toString(config.getPipeHardLinkWALEnabled()))));
+    config.setPipeFileReceiverFsyncEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_file_receiver_fsync_enabled",
+                Boolean.toString(config.getPipeFileReceiverFsyncEnabled()))));
 
     config.setPipeDataStructureTabletRowSize(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index c41841264fa..9cb99a226ba 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -54,6 +54,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeHardLinkWALEnabled();
   }
 
+  public boolean getPipeFileReceiverFsyncEnabled() {
+    return COMMON_CONFIG.getPipeFileReceiverFsyncEnabled();
+  }
+
   /////////////////////////////// Tablet ///////////////////////////////
 
   public int getPipeDataStructureTabletRowSize() {
@@ -335,6 +339,7 @@ public class PipeConfig {
     LOGGER.info("PipeHardlinkTsFileDirName: {}", 
getPipeHardlinkTsFileDirName());
     LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
     LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
+    LOGGER.info("PipeFileReceiverFsyncEnabled: {}", 
getPipeFileReceiverFsyncEnabled());
 
     LOGGER.info("PipeDataStructureTabletRowSize: {}", 
getPipeDataStructureTabletRowSize());
     LOGGER.info("PipeDataStructureTabletSizeInBytes: {}", 
getPipeDataStructureTabletSizeInBytes());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 59bcc06c45c..14bd34ebea8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
@@ -72,6 +73,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
   protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 
+  private static final boolean IS_FSYNC_ENABLED =
+      PipeConfig.getInstance().getPipeFileReceiverFsyncEnabled();
   private File writingFile;
   private RandomAccessFile writingFileWriter;
 
@@ -337,7 +340,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         fileName,
         writingFile == null ? "null" : writingFile.getPath());
 
-    closeCurrentWritingFileWriter();
+    closeCurrentWritingFileWriter(!isSingleFile);
     // If there are multiple files we can not delete the current file
     // instead they will be deleted after seal request
     if (writingFile != null && isSingleFile) {
@@ -372,9 +375,12 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     return writingFile != null && writingFile.getName().equals(fileName);
   }
 
-  private void closeCurrentWritingFileWriter() {
+  private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) {
     if (writingFileWriter != null) {
       try {
+        if (IS_FSYNC_ENABLED && fsyncAfterClose) {
+          writingFileWriter.getFD().sync();
+        }
         writingFileWriter.close();
         LOGGER.info(
             "Receiver id = {}: Current writing file writer {} was closed.",
@@ -467,6 +473,12 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
 
       final String fileAbsolutePath = writingFile.getAbsolutePath();
 
+      // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
+      // load the file before the data is written to the disk and cause 
unexpected behavior after
+      // system restart. (e.g., empty file in data region's data directory)
+      if (IS_FSYNC_ENABLED) {
+        writingFileWriter.getFD().sync();
+      }
       // 1. The writing file writer must be closed, otherwise it may cause 
concurrent errors during
       // the process of loading tsfile when parsing tsfile.
       //
@@ -508,7 +520,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod (if exists) should be 
retransmitted by the
       // sender.
-      closeCurrentWritingFileWriter();
+      closeCurrentWritingFileWriter(false);
       deleteCurrentWritingFile();
     }
   }
@@ -543,6 +555,12 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         }
       }
 
+      // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
+      // load the file before the data is written to the disk and cause 
unexpected behavior after
+      // system restart. (e.g., empty file in data region's data directory)
+      if (IS_FSYNC_ENABLED) {
+        writingFileWriter.getFD().sync();
+      }
       // 1. The writing file writer must be closed, otherwise it may cause 
concurrent errors during
       // the process of loading tsfile when parsing tsfile.
       //
@@ -583,7 +601,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
       // sender.
-      closeCurrentWritingFileWriter();
+      closeCurrentWritingFileWriter(false);
       // Clear the directory instead of only deleting the referenced files in 
seal request
       // to avoid previously undeleted file being redundant when transferring 
multi files
       
IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());

Reply via email to