This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch 13915 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4dd53a3e8905045ade27d2efff0b3afb34156793 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Nov 5 15:02:36 2024 +0800 Pipe: Optimize lock in PipeTsFileResourceManager using segment lock strategy (#13915) (cherry picked from commit b8432f88fcd6fad8b56d6e95b86f17a5b8031824) --- .../resource/tsfile/PipeTsFileResourceManager.java | 123 ++++++++++----------- .../tsfile/PipeTsFileResourceSegmentLock.java | 109 ++++++++++++++++++ 2 files changed, 168 insertions(+), 64 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index e7367956126..d3482653f85 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; -import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -41,7 +40,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; public class PipeTsFileResourceManager { @@ -49,7 +47,7 @@ public class PipeTsFileResourceManager { private final Map<String, PipeTsFileResource> hardlinkOrCopiedFileToPipeTsFileResourceMap = new ConcurrentHashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); + private final PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock(); public PipeTsFileResourceManager() { PipeDataNodeAgent.runtime() @@ -61,25 +59,20 @@ public class PipeTsFileResourceManager { private void tryTtlCheck() { try { - final long timeout = PipePeriodicalJobExecutor.getMinIntervalSeconds() >> 1; - if (lock.tryLock(timeout, TimeUnit.SECONDS)) { - try { - ttlCheck(); - } finally { - lock.unlock(); - } - } else { - LOGGER.warn("failed to try lock when checking TTL because of timeout ({}s)", timeout); - } + ttlCheck(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn("failed to try lock when checking TTL because of interruption", e); + } catch (final Exception e) { + LOGGER.warn("failed to check TTL of PipeTsFileResource: ", e); } } - private void ttlCheck() { + private void ttlCheck() throws InterruptedException { final Iterator<Map.Entry<String, PipeTsFileResource>> iterator = hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator(); + final long timeout = + PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds() >> 1; final Optional<Logger> logger = PipeDataNodeResourceManager.log() .schedule( @@ -91,6 +84,15 @@ public class PipeTsFileResourceManager { while (iterator.hasNext()) { final Map.Entry<String, PipeTsFileResource> entry = iterator.next(); + final String hardlinkOrCopiedFile = entry.getKey(); + if (!segmentLock.tryLock(new File(hardlinkOrCopiedFile), timeout, TimeUnit.SECONDS)) { + LOGGER.warn( + "failed to try lock when checking TTL for file {} because of timeout ({}s)", + hardlinkOrCopiedFile, + timeout); + continue; + } + try { if (entry.getValue().closeIfOutOfTimeToLive()) { iterator.remove(); @@ -104,6 +106,8 @@ public class PipeTsFileResourceManager { } } catch (final IOException e) { LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", e); + } finally { + segmentLock.unlock(new File(hardlinkOrCopiedFile)); } } } @@ -132,18 +136,23 @@ public class PipeTsFileResourceManager { public File increaseFileReference( final File file, final boolean isTsFile, final TsFileResource tsFileResource) throws IOException { - lock.lock(); + // If the file is already a hardlink or copied file, + // just increase reference count and return it + segmentLock.lock(file); try { - // If the file is already a hardlink or copied file, - // just increase reference count and return it - if (increaseReferenceIfExists(file.getPath())) { + if (increaseReferenceIfExists(file)) { return file; } + } finally { + segmentLock.unlock(file); + } - // If the file is not a hardlink or copied file, check if there is a related hardlink or - // copied file in pipe dir. if so, increase reference count and return it - final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file); - if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) { + // If the file is not a hardlink or copied file, check if there is a related hardlink or + // copied file in pipe dir. if so, increase reference count and return it + final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file); + segmentLock.lock(hardlinkOrCopiedFile); + try { + if (increaseReferenceIfExists(hardlinkOrCopiedFile)) { return hardlinkOrCopiedFileToPipeTsFileResourceMap .get(hardlinkOrCopiedFile.getPath()) .getFile(); @@ -162,12 +171,13 @@ public class PipeTsFileResourceManager { resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile, tsFileResource)); return resultFile; } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedFile); } } - private boolean increaseReferenceIfExists(final String path) { - final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path); + private boolean increaseReferenceIfExists(final File file) { + final PipeTsFileResource resource = + hardlinkOrCopiedFileToPipeTsFileResourceMap.get(file.getPath()); if (resource != null) { resource.increaseAndGetReference(); return true; @@ -220,7 +230,7 @@ public class PipeTsFileResourceManager { * @param hardlinkOrCopiedFile the copied or hardlinked file */ public void decreaseFileReference(final File hardlinkOrCopiedFile) { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedFile); try { final String filePath = hardlinkOrCopiedFile.getPath(); final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath); @@ -228,7 +238,7 @@ public class PipeTsFileResourceManager { resource.decreaseAndGetReference(); } } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedFile); } } @@ -239,13 +249,13 @@ public class PipeTsFileResourceManager { * @return the reference count of the file */ public int getFileReferenceCount(final File hardlinkOrCopiedFile) { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedFile); try { final String filePath = hardlinkOrCopiedFile.getPath(); final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath); return resource != null ? resource.getReferenceCount() : 0; } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedFile); } } @@ -256,94 +266,78 @@ public class PipeTsFileResourceManager { * false} if they can not be cached. */ public boolean cacheObjectsIfAbsent(final File hardlinkOrCopiedTsFile) throws IOException { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedTsFile); try { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath()); return resource != null && resource.cacheObjectsIfAbsent(); } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedTsFile); } } public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache( final File hardlinkOrCopiedTsFile) throws IOException { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedTsFile); try { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath()); return resource == null ? null : resource.tryGetDeviceMeasurementsMap(); } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedTsFile); } } public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache( final File hardlinkOrCopiedTsFile, final boolean cacheOtherMetadata) throws IOException { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedTsFile); try { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath()); return resource == null ? null : resource.tryGetDeviceIsAlignedMap(cacheOtherMetadata); } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedTsFile); } } public Map<String, TSDataType> getMeasurementDataTypeMapFromCache( final File hardlinkOrCopiedTsFile) throws IOException { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedTsFile); try { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath()); return resource == null ? null : resource.tryGetMeasurementDataTypeMap(); } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedTsFile); } } public void pinTsFileResource(final TsFileResource resource, final boolean withMods) throws IOException { - lock.lock(); - try { - increaseFileReference(resource.getTsFile(), true, resource); - if (withMods && resource.getModFile().exists()) { - increaseFileReference(new File(resource.getModFile().getFilePath()), false, null); - } - } finally { - lock.unlock(); + increaseFileReference(resource.getTsFile(), true, resource); + if (withMods && resource.getModFile().exists()) { + increaseFileReference(new File(resource.getModFile().getFilePath()), false, null); } } public void unpinTsFileResource(final TsFileResource resource) throws IOException { - lock.lock(); - try { - final File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()); - decreaseFileReference(pinnedFile); + final File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()); + decreaseFileReference(pinnedFile); - final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX); - if (modFile.exists()) { - decreaseFileReference(modFile); - } - } finally { - lock.unlock(); + final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX); + if (modFile.exists()) { + decreaseFileReference(modFile); } } public int getLinkedTsfileCount() { - lock.lock(); - try { - return hardlinkOrCopiedFileToPipeTsFileResourceMap.size(); - } finally { - lock.unlock(); - } + return hardlinkOrCopiedFileToPipeTsFileResourceMap.size(); } /** * Get the total size of linked TsFiles whose original TsFile is deleted (by compaction or else) */ public long getTotalLinkedButDeletedTsfileSize() { - lock.lock(); try { return hardlinkOrCopiedFileToPipeTsFileResourceMap.values().parallelStream() .filter(PipeTsFileResource::isOriginalTsFileDeleted) @@ -358,8 +352,9 @@ public class PipeTsFileResourceManager { } }) .sum(); - } finally { - lock.unlock(); + } catch (final Exception e) { + LOGGER.warn("failed to get total size of linked but deleted TsFiles: ", e); + return 0; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java new file mode 100644 index 00000000000..cc73ee0db38 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.tsfile; + +import org.apache.iotdb.db.storageengine.StorageEngine; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class PipeTsFileResourceSegmentLock { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileResourceSegmentLock.class); + + private static final int SEGMENT_LOCK_MIN_SIZE = 32; + private static final int SEGMENT_LOCK_MAX_SIZE = 128; + + private volatile ReentrantLock[] locks; + + private void initIfNecessary() { + if (locks == null) { + synchronized (this) { + if (locks == null) { + int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE; + try { + lockSegmentSize = StorageEngine.getInstance().getAllDataRegionIds().size(); + } catch (final Exception e) { + LOGGER.warn( + "Cannot get data region ids, use default lock segment size: {}", lockSegmentSize); + } + lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize); + lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize); + + locks = new ReentrantLock[lockSegmentSize]; + for (int i = 0; i < locks.length; i++) { + locks[i] = new ReentrantLock(); + } + } + } + } + } + + public void lock(final File file) { + initIfNecessary(); + locks[Math.abs(file.hashCode()) % locks.length].lock(); + } + + public boolean tryLock(final File file, final long timeout, final TimeUnit timeUnit) + throws InterruptedException { + initIfNecessary(); + return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, timeUnit); + } + + public boolean tryLockAll(final long timeout, final TimeUnit timeUnit) + throws InterruptedException { + initIfNecessary(); + int alreadyLocked = 0; + for (final ReentrantLock lock : locks) { + if (lock.tryLock(timeout, timeUnit)) { + alreadyLocked++; + } else { + break; + } + } + + if (alreadyLocked == locks.length) { + return true; + } else { + unlockUntil(alreadyLocked); + return false; + } + } + + private void unlockUntil(final int index) { + for (int i = 0; i < index; i++) { + locks[i].unlock(); + } + } + + public void unlock(final File file) { + initIfNecessary(); + locks[Math.abs(file.hashCode()) % locks.length].unlock(); + } + + public void unlockAll() { + initIfNecessary(); + unlockUntil(locks.length); + } +}
