This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 76a5f5d848c Pipe: decouple pipe receiver directory from the system
directory and add support for multiple folders (#11333)
76a5f5d848c is described below
commit 76a5f5d848c2f4a4262f210a880ab3c5e19463c5
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Oct 18 11:06:55 2023 +0800
Pipe: decouple pipe receiver directory from the system directory and add
support for multiple folders (#11333)
**NOTE:** In https://github.com/apache/iotdb/pull/11318 and THIS PR, it is
assumed that `systemDir` will be initialized with the user-specified value
**before** the call to `getPipeReceiverFileDir(s)`.
---
.../resources/conf/iotdb-datanode.properties | 13 ++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 18 +++++-----
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +++--
.../db/pipe/agent/receiver/PipeReceiverAgent.java | 14 +++++---
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 2 +-
.../receiver/thrift/IoTDBThriftReceiverV1.java | 39 ++++++++++++++++++++--
6 files changed, 77 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
index 2170c6ec004..c8aef63aa1b 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
@@ -223,6 +223,19 @@ dn_target_config_node_list=127.0.0.1:10710
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# sort_tmp_dir=data/datanode/tmp
+# pipe_receiver_file_dirs
+# If this property is unset, system will save the data in the default relative
path directory under the IoTDB folder(i.e.,
%IOTDB_HOME%/${dn_system_dir}/pipe/receiver).
+# If it is absolute, system will save the data in the exact location it points
to.
+# If it is relative, system will save the data in the relative path directory
it indicates under the IoTDB folder.
+# If there are more than one directory, please separate them by commas ",".
+# Note: If pipe_receiver_file_dirs is assigned an empty
string(i.e.,zero-size), it will be handled as a relative path.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is
"\\\\", then the path is absolute. Otherwise, it is relative.
+# pipe_receiver_file_dirs=data\\datanode\\system\\pipe\\receiver
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# pipe_receiver_file_dirs=data/datanode/system/pipe/receiver
+
####################
### Metric Configuration
####################
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e5594f85f81..9c06c97e7d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1081,7 +1081,7 @@ public class IoTDBConfig {
/** Pipe related */
/** initialized as null, updated based on the latest `systemDir` during
querying */
- private String pipeReceiverFileDir = null;
+ private String[] pipeReceiverFileDirs = null;
/** Resource control */
private boolean quotaEnable = false;
@@ -1214,7 +1214,9 @@ public class IoTDBConfig {
triggerTemporaryLibDir = addDataHomeDir(triggerTemporaryLibDir);
pipeDir = addDataHomeDir(pipeDir);
pipeTemporaryLibDir = addDataHomeDir(pipeTemporaryLibDir);
- pipeReceiverFileDir = addDataHomeDir(pipeReceiverFileDir);
+ for (int i = 0; i < pipeReceiverFileDirs.length; i++) {
+ pipeReceiverFileDirs[i] = addDataHomeDir(pipeReceiverFileDirs[i]);
+ }
mqttDir = addDataHomeDir(mqttDir);
extPipeDir = addDataHomeDir(extPipeDir);
queryDir = addDataHomeDir(queryDir);
@@ -3708,14 +3710,14 @@ public class IoTDBConfig {
return modeMapSizeThreshold;
}
- public void setPipeReceiverFileDir(String pipeReceiverFileDir) {
- this.pipeReceiverFileDir = pipeReceiverFileDir;
+ public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
+ this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}
- public String getPipeReceiverFileDir() {
- return Objects.isNull(this.pipeReceiverFileDir)
- ? (systemDir + File.separator + "pipe" + File.separator + "receiver")
- : this.pipeReceiverFileDir;
+ public String[] getPipeReceiverFileDirs() {
+ return Objects.isNull(this.pipeReceiverFileDirs)
+ ? new String[] {systemDir + File.separator + "pipe" + File.separator +
"receiver"}
+ : this.pipeReceiverFileDirs;
}
public boolean isQuotaEnable() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7850bf6b103..2e94b53f2bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1972,8 +1972,12 @@ public class IoTDBDescriptor {
private void loadPipeProps(Properties properties) {
conf.setPipeLibDir(properties.getProperty("pipe_lib_dir",
conf.getPipeLibDir()));
- conf.setPipeReceiverFileDir(
- properties.getProperty("pipe_receiver_file_dir",
conf.getPipeReceiverFileDir()));
+ conf.setPipeReceiverFileDirs(
+ properties
+ .getProperty(
+ "pipe_receiver_file_dirs", String.join(",",
conf.getPipeReceiverFileDirs()))
+ .trim()
+ .split(","));
}
private void loadCQProps(Properties properties) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
index 53967e17757..b2af4f2258f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
/** PipeReceiverAgent is the entry point of all pipe receivers' logic. */
public class PipeReceiverAgent {
@@ -58,10 +59,7 @@ public class PipeReceiverAgent {
return legacyAgent;
}
- public void cleanPipeReceiverDir() {
- final File receiverFileDir =
- new
File(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDir());
-
+ private static void cleanPipeReceiverDir(File receiverFileDir) {
try {
FileUtils.deleteDirectory(receiverFileDir);
LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
@@ -76,4 +74,12 @@ public class PipeReceiverAgent {
LOGGER.warn("Create pipe receiver dir {} failed.", receiverFileDir, e);
}
}
+
+ public void cleanPipeReceiverDirs() {
+ String[] pipeReceiverFileDirs =
+ IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs();
+ Arrays.stream(pipeReceiverFileDirs)
+ .map(File::new)
+ .forEach(PipeReceiverAgent::cleanPipeReceiverDir);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 37f56b7835f..dc50ba94bee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -59,7 +59,7 @@ public class PipeRuntimeAgent implements IService {
PipeHardlinkFileDirStartupCleaner.clean();
// clean receiver file dir
- PipeAgent.receiver().cleanPipeReceiverDir();
+ PipeAgent.receiver().cleanPipeReceiverDirs();
PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
simpleConsensusProgressIndexAssigner.start();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index a3039214073..6d8253e0a40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.db.pipe.receiver.thrift;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
@@ -49,6 +51,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
+import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -63,6 +67,8 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -73,7 +79,21 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private static final String RECEIVER_FILE_BASE_DIR =
IOTDB_CONFIG.getPipeReceiverFileDir();
+ private static final String[] RECEIVER_FILE_BASE_DIRS =
IOTDB_CONFIG.getPipeReceiverFileDirs();
+ private static FolderManager folderManager = null;
+
+ static {
+ try {
+ folderManager =
+ new FolderManager(
+ Arrays.asList(RECEIVER_FILE_BASE_DIRS),
DirectoryStrategyType.SEQUENCE_STRATEGY);
+ } catch (DiskSpaceInsufficientException e) {
+ LOGGER.error(
+ "Fail to create pipe receiver file folders allocation strategy
because all disks of folders are full.",
+ e);
+ }
+ }
+
private final AtomicReference<File> receiverFileDirWithIdSuffix = new
AtomicReference<>();
// Used to generate transfer id, which is used to identify a receiver thread.
@@ -182,8 +202,23 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
LOGGER.info("Current receiver file dir is null. No need to delete.");
}
+ // get next receiver file base dir by folder manager
+ if (Objects.isNull(folderManager)) {
+ LOGGER.error(
+ "Failed to init pipe receiver file folder manager because all disks
of folders are full.");
+ return new
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
+ }
+ String receiverFileBaseDir;
+ try {
+ receiverFileBaseDir = folderManager.getNextFolder();
+ } catch (DiskSpaceInsufficientException e) {
+ LOGGER.error(
+ "Fail to create pipe receiver file folder because all disks of
folders are full.", e);
+ return new
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
+ }
+
// create a new receiver file dir
- final File newReceiverDir = new File(RECEIVER_FILE_BASE_DIR,
Long.toString(receiverId.get()));
+ final File newReceiverDir = new File(receiverFileBaseDir,
Long.toString(receiverId.get()));
if (!newReceiverDir.exists()) {
if (newReceiverDir.mkdirs()) {
LOGGER.info("Receiver file dir {} was created.",
newReceiverDir.getPath());