This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 66b0c8209cc62ba3fbdd92ce4f0dca4ec75186ec Author: Caideyipi <[email protected]> AuthorDate: Thu Jul 24 17:02:16 2025 +0800 Pipe: Fixed the hard-link lock problem & Some pipe CIs on master (#16006) (#16012) * Update IoTDBPipePermissionIT.java * refacotr * revert=pom * fix-lock --- .../overview/PipeDataNodeSinglePipeMetrics.java | 6 +-- .../resource/tsfile/PipeTsFileResourceManager.java | 62 +++++++++++----------- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index 1cbbae7ec37..c9c343a0667 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -404,18 +404,18 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { //////////////////////////// singleton //////////////////////////// - private static class PipeDataNodeRemainingEventAndTimeMetricsHolder { + private static class PipeDataNodeSinglePipeMetricsHolder { private static final PipeDataNodeSinglePipeMetrics INSTANCE = new PipeDataNodeSinglePipeMetrics(); - private PipeDataNodeRemainingEventAndTimeMetricsHolder() { + private PipeDataNodeSinglePipeMetricsHolder() { // Empty constructor } } public static PipeDataNodeSinglePipeMetrics getInstance() { - return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE; + return PipeDataNodeSinglePipeMetricsHolder.INSTANCE; } private PipeDataNodeSinglePipeMetrics() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 86a5d40d375..fd74dadac5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -92,30 +92,27 @@ public class PipeTsFileResourceManager { throws IOException { // If the file is already a hardlink or copied file, // just increase reference count and return it - segmentLock.lock(file); - try { - if (increaseReferenceIfExists(file, pipeName, isTsFile)) { - return file; - } - } finally { - segmentLock.unlock(file); + if (increaseReferenceIfExists(file, pipeName, isTsFile)) { + return file; } // If the file is not a hardlink or copied file, check if there is a related hardlink or // copied file in pipe dir. if so, increase reference count and return it final File hardlinkOrCopiedFile = Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, pipeName) : file; - segmentLock.lock(hardlinkOrCopiedFile); - try { - if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) { - return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile(); - } - // If the file is a tsfile, create a hardlink in pipe dir and will return it. - // otherwise, copy the file (.mod or .resource) to pipe dir and will return it. - final File source = Objects.isNull(sourceFile) ? file : sourceFile; + if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) { + return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile(); + } + + // If the file is a tsfile, create a hardlink in pipe dir and will return it. + // otherwise, copy the file (.mod or .resource) to pipe dir and will return it. + final File source = Objects.isNull(sourceFile) ? file : sourceFile; + final File resultFile; - final File resultFile = + segmentLock.lock(hardlinkOrCopiedFile); + try { + resultFile = isTsFile ? FileUtils.createHardLink(source, hardlinkOrCopiedFile) : FileUtils.copyFile(source, hardlinkOrCopiedFile); @@ -131,25 +128,29 @@ public class PipeTsFileResourceManager { hardlinkOrCopiedFileToTsFilePublicResourceMap.put( resultFile.getPath(), new PipeTsFilePublicResource(resultFile)); } - - increasePublicReference(resultFile, pipeName, isTsFile); - - return resultFile; } finally { segmentLock.unlock(hardlinkOrCopiedFile); } + increasePublicReference(resultFile, pipeName, isTsFile); + return resultFile; } private boolean increaseReferenceIfExists( final File file, final @Nullable String pipeName, final boolean isTsFile) throws IOException { - final String path = file.getPath(); - final PipeTsFileResource resource = getResourceMap(pipeName).get(path); - if (resource != null) { - resource.increaseReferenceCount(); - increasePublicReference(file, pipeName, isTsFile); - return true; + segmentLock.lock(file); + try { + final String path = file.getPath(); + final PipeTsFileResource resource = getResourceMap(pipeName).get(path); + if (resource != null) { + resource.increaseReferenceCount(); + } else { + return false; + } + } finally { + segmentLock.unlock(file); } - return false; + increasePublicReference(file, pipeName, isTsFile); + return true; } private void increasePublicReference( @@ -222,12 +223,13 @@ public class PipeTsFileResourceManager { if (resource != null && resource.decreaseReferenceCount()) { getResourceMap(pipeName).remove(filePath); } - // Decrease the assigner's file to clear hard-link and memory cache - // Note that it does not exist for historical files - decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName); } finally { segmentLock.unlock(hardlinkOrCopiedFile); } + + // Decrease the assigner's file to clear hard-link and memory cache + // Note that it does not exist for historical files + decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName); } private void decreasePublicReferenceIfExists(final File file, final @Nullable String pipeName) {
