This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aabccb74fee3f7a9c42f1f8185e7b7aa7a47682d Author: Caideyipi <[email protected]> AuthorDate: Tue Jul 22 09:34:19 2025 +0800 Pipe: Fixed some errors in cherry-picking (#15984) * Update TsFileProcessor.java * fix-cp-2 * fix-fix * Update PipeTsFileResourceManager.java * Update PipeTsFileResourceManager.java (cherry picked from commit f076824eb057f50e8e8800bdd7e709da7868e48c) --- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 6 --- .../realtime/assigner/PipeDataRegionAssigner.java | 9 ---- .../resource/tsfile/PipeTsFileResourceManager.java | 60 +++++++++++----------- .../dataregion/memtable/TsFileProcessor.java | 7 --- 4 files changed, 31 insertions(+), 51 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 9b091a88f51..329a123c489 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -557,12 +557,6 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath) : 0) + (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0) + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0) - + (Objects.nonNull(treeModelDatabaseName) - ? RamUsageEstimator.sizeOf(treeModelDatabaseName) - : 0) - + (Objects.nonNull(tableModelDatabaseName) - ? RamUsageEstimator.sizeOf(tableModelDatabaseName) - : 0) + (Objects.nonNull(tableNames) ? SET_SIZE + tableNames.stream().mapToLong(RamUsageEstimator::sizeOf).reduce(0L, Long::sum) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 6238117ed89..366a17bdd79 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -20,9 +20,6 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner; import org.apache.iotdb.commons.consensus.DataRegionId; -import org.apache.iotdb.commons.consensus.index.ProgressIndex; -import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; @@ -51,14 +48,11 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; public class PipeDataRegionAssigner implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataRegionAssigner.class); - private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); - /** * The {@link PipeDataRegionMatcher} is used to match the event with the extractor based on the * pattern. @@ -72,9 +66,6 @@ public class PipeDataRegionAssigner implements Closeable { private Boolean isTableModel; - private final AtomicReference<ProgressIndex> maxProgressIndexForRealtimeEvent = - new AtomicReference<>(MinimumProgressIndex.INSTANCE); - private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); public String getDataRegionId() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 31bc403b5b9..d1d1286991b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -55,6 +55,11 @@ public class PipeTsFileResourceManager { hardlinkOrCopiedFileToPipeTsFileResourceMap = new ConcurrentHashMap<>(); private final PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock(); + public File increaseFileReference( + final File file, final boolean isTsFile, final @Nullable String pipeName) throws IOException { + return increaseFileReference(file, isTsFile, pipeName, null); + } + /** * Given a file, create a hardlink or copy it to pipe dir, maintain a reference count for the * hardlink or copied file, and return the hardlink or copied file. @@ -71,16 +76,24 @@ public class PipeTsFileResourceManager { * @param file tsfile, resource file or mod file. can be original file or hardlink/copy of * original file * @param isTsFile {@code true} to create hardlink, {@code false} to copy file + * @param pipeName Nonnull if the pipe is from historical or assigner -> extractors, null if is + * dataRegion -> assigner + * @param sourceFile for inner use, historical extractor will use this to create hardlink from + * pipe tsFile -> common tsFile * @return the hardlink or copied file * @throws IOException when create hardlink or copy file failed */ - public File increaseFileReference( - final File file, final boolean isTsFile, final @Nullable String pipeName) throws IOException { + private File increaseFileReference( + final File file, + final boolean isTsFile, + final @Nullable String pipeName, + final @Nullable File sourceFile) + throws IOException { // If the file is already a hardlink or copied file, // just increase reference count and return it segmentLock.lock(file); try { - if (increaseReferenceIfExists(file, pipeName)) { + if (increaseReferenceIfExists(file, pipeName, isTsFile)) { return file; } } finally { @@ -89,19 +102,22 @@ public class PipeTsFileResourceManager { // If the file is not a hardlink or copied file, check if there is a related hardlink or // copied file in pipe dir. if so, increase reference count and return it - final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file, pipeName); + final File hardlinkOrCopiedFile = + Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, pipeName) : file; segmentLock.lock(hardlinkOrCopiedFile); try { - if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName)) { + if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) { return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile(); } // If the file is a tsfile, create a hardlink in pipe dir and will return it. // otherwise, copy the file (.mod or .resource) to pipe dir and will return it. + final File source = Objects.isNull(sourceFile) ? file : sourceFile; + final File resultFile = isTsFile - ? FileUtils.createHardLink(file, hardlinkOrCopiedFile) - : FileUtils.copyFile(file, hardlinkOrCopiedFile); + ? FileUtils.createHardLink(source, hardlinkOrCopiedFile) + : FileUtils.copyFile(source, hardlinkOrCopiedFile); // If the file is not a hardlink or copied file, and there is no related hardlink or copied // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a reference count for @@ -115,7 +131,7 @@ public class PipeTsFileResourceManager { resultFile.getPath(), new PipeTsFilePublicResource(resultFile)); } - increasePublicReference(resultFile, pipeName); + increasePublicReference(resultFile, pipeName, isTsFile); return resultFile; } finally { @@ -123,34 +139,26 @@ public class PipeTsFileResourceManager { } } - private boolean increaseReferenceIfExists(final File file, final @Nullable String pipeName) { + private boolean increaseReferenceIfExists( + final File file, final @Nullable String pipeName, final boolean isTsFile) throws IOException { final String path = file.getPath(); final PipeTsFileResource resource = getResourceMap(pipeName).get(path); if (resource != null) { resource.increaseReferenceCount(); - increasePublicReference(file, pipeName); + increasePublicReference(file, pipeName, isTsFile); return true; } return false; } - private void increasePublicReference(final File file, final String pipeName) { + private void increasePublicReference( + final File file, final @Nullable String pipeName, final boolean isTsFile) throws IOException { if (Objects.isNull(pipeName)) { return; } // Increase the assigner's file to avoid hard-link or memory cache cleaning // Note that it does not exist for historical files - final String path = getCommonFilePath(file); - hardlinkOrCopiedFileToTsFilePublicResourceMap.compute( - path, - (k, v) -> { - if (Objects.isNull(v)) { - return new PipeTsFilePublicResource(new File(path)); - } else { - v.increaseReferenceCount(); - return v; - } - }); + increaseFileReference(new File(getCommonFilePath(file)), isTsFile, null, file); } public static File getHardlinkOrCopiedFileInPipeDir( @@ -227,13 +235,7 @@ public class PipeTsFileResourceManager { } // Increase the assigner's file to avoid hard-link or memory cache cleaning // Note that it does not exist for historical files - final String commonFilePath = getCommonFilePath(file); - if (hardlinkOrCopiedFileToTsFilePublicResourceMap.containsKey(commonFilePath) - && hardlinkOrCopiedFileToTsFilePublicResourceMap - .get(commonFilePath) - .decreaseReferenceCount()) { - hardlinkOrCopiedFileToPipeTsFileResourceMap.remove(commonFilePath); - } + decreaseFileReference(new File(getCommonFilePath(file)), null); } // Warning: Shall not be called by the assigner diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index cdf66b2da3c..9ac6f05df0a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1302,13 +1302,6 @@ public class TsFileProcessor { tsFileResource.setGeneratedByPipe(isTotallyGeneratedByPipe.get()); try { - PipeInsertionDataNodeListener.getInstance() - .listenToTsFile( - dataRegionInfo.getDataRegion().getDataRegionId(), - dataRegionInfo.getDataRegion().getDatabaseName(), - tsFileResource, - false); - // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke // flushing memTable in System module. Future<?> future = addAMemtableIntoFlushingList(tmpMemTable);
