This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 376162a3ad9 Pipe: Fixed the hard-link lock problem & Some pipe CIs on
master (#16006)
376162a3ad9 is described below
commit 376162a3ad9a9e93cf44b13737251b7d5f8129ef
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 16:11:13 2025 +0800
Pipe: Fixed the hard-link lock problem & Some pipe CIs on master (#16006)
* Update IoTDBPipePermissionIT.java
* refacotr
* revert=pom
* fix-lock
---
.../manual/basic/IoTDBPipePermissionIT.java | 6 ++-
.../overview/PipeDataNodeSinglePipeMetrics.java | 6 +--
.../resource/tsfile/PipeTsFileResourceManager.java | 62 +++++++++++-----------
3 files changed, 40 insertions(+), 34 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 890a90cdb53..f991b9bb78e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -207,7 +207,11 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
return;
}
- TableModelUtils.createDataBaseAndTable(receiverEnv, "test", "test");
+ try {
+ TableModelUtils.createDataBaseAndTable(receiverEnv, "test", "test");
+ } catch (final Exception ignore) {
+ // Ignore because the db/table may be transferred because sender user
may see these
+ }
// Exception, block here
TableModelUtils.assertCountDataAlwaysOnEnv("test", "test", 0, receiverEnv);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 1840093a347..65efae61714 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -404,18 +404,18 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
//////////////////////////// singleton ////////////////////////////
- private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
+ private static class PipeDataNodeSinglePipeMetricsHolder {
private static final PipeDataNodeSinglePipeMetrics INSTANCE =
new PipeDataNodeSinglePipeMetrics();
- private PipeDataNodeRemainingEventAndTimeMetricsHolder() {
+ private PipeDataNodeSinglePipeMetricsHolder() {
// Empty constructor
}
}
public static PipeDataNodeSinglePipeMetrics getInstance() {
- return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE;
+ return PipeDataNodeSinglePipeMetricsHolder.INSTANCE;
}
private PipeDataNodeSinglePipeMetrics() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index d1d1286991b..c1e4ff9ddf1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -91,30 +91,27 @@ public class PipeTsFileResourceManager {
throws IOException {
// If the file is already a hardlink or copied file,
// just increase reference count and return it
- segmentLock.lock(file);
- try {
- if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
- return file;
- }
- } finally {
- segmentLock.unlock(file);
+ if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
+ return file;
}
// If the file is not a hardlink or copied file, check if there is a
related hardlink or
// copied file in pipe dir. if so, increase reference count and return it
final File hardlinkOrCopiedFile =
Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file,
pipeName) : file;
- segmentLock.lock(hardlinkOrCopiedFile);
- try {
- if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile))
{
- return
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
- }
- // If the file is a tsfile, create a hardlink in pipe dir and will
return it.
- // otherwise, copy the file (.mod or .resource) to pipe dir and will
return it.
- final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+ if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) {
+ return
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
+ }
+
+ // If the file is a tsfile, create a hardlink in pipe dir and will return
it.
+ // otherwise, copy the file (.mod or .resource) to pipe dir and will
return it.
+ final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+ final File resultFile;
- final File resultFile =
+ segmentLock.lock(hardlinkOrCopiedFile);
+ try {
+ resultFile =
isTsFile
? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
: FileUtils.copyFile(source, hardlinkOrCopiedFile);
@@ -130,25 +127,29 @@ public class PipeTsFileResourceManager {
hardlinkOrCopiedFileToTsFilePublicResourceMap.put(
resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
}
-
- increasePublicReference(resultFile, pipeName, isTsFile);
-
- return resultFile;
} finally {
segmentLock.unlock(hardlinkOrCopiedFile);
}
+ increasePublicReference(resultFile, pipeName, isTsFile);
+ return resultFile;
}
private boolean increaseReferenceIfExists(
final File file, final @Nullable String pipeName, final boolean
isTsFile) throws IOException {
- final String path = file.getPath();
- final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
- if (resource != null) {
- resource.increaseReferenceCount();
- increasePublicReference(file, pipeName, isTsFile);
- return true;
+ segmentLock.lock(file);
+ try {
+ final String path = file.getPath();
+ final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
+ if (resource != null) {
+ resource.increaseReferenceCount();
+ } else {
+ return false;
+ }
+ } finally {
+ segmentLock.unlock(file);
}
- return false;
+ increasePublicReference(file, pipeName, isTsFile);
+ return true;
}
private void increasePublicReference(
@@ -221,12 +222,13 @@ public class PipeTsFileResourceManager {
if (resource != null && resource.decreaseReferenceCount()) {
getResourceMap(pipeName).remove(filePath);
}
- // Decrease the assigner's file to clear hard-link and memory cache
- // Note that it does not exist for historical files
- decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
} finally {
segmentLock.unlock(hardlinkOrCopiedFile);
}
+
+ // Decrease the assigner's file to clear hard-link and memory cache
+ // Note that it does not exist for historical files
+ decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
}
private void decreasePublicReferenceIfExists(final File file, final
@Nullable String pipeName) {