This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch seg-lock-for-pipe-resource in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b5f953c1dea096a61be333af43ab334e0fa7e7e4 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Oct 25 14:17:16 2024 +0800 Pipe: Optimize lock in PipeTsFileResourceManager using segment lock strategy --- .../resource/tsfile/PipeTsFileResourceManager.java | 129 ++++++++++----------- .../tsfile/PipeTsFileResourceSegmentLock.java | 109 +++++++++++++++++ 2 files changed, 171 insertions(+), 67 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index bb8bc948de8..dfdf6ddc2fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -40,7 +40,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; public class PipeTsFileResourceManager { @@ -48,7 +47,7 @@ public class PipeTsFileResourceManager { private final Map<String, PipeTsFileResource> hardlinkOrCopiedFileToPipeTsFileResourceMap = new ConcurrentHashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); + private final PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock(); public PipeTsFileResourceManager() { PipeDataNodeAgent.runtime() @@ -62,11 +61,11 @@ public class PipeTsFileResourceManager { try { final long timeout = PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds() >> 1; - if (lock.tryLock(timeout, TimeUnit.SECONDS)) { + if (segmentLock.tryLockAll(timeout, TimeUnit.SECONDS)) { try { ttlCheck(); } finally { - lock.unlock(); + segmentLock.unlockAll(); } } else { LOGGER.warn("failed to try lock when checking TTL because of timeout ({}s)", timeout); @@ -132,29 +131,34 @@ public class PipeTsFileResourceManager { public File increaseFileReference( final File file, final boolean isTsFile, final TsFileResource tsFileResource) throws IOException { - lock.lock(); - try { - // If the file is already a hardlink or copied file, - // just increase reference count and return it - if (increaseReferenceIfExists(file.getPath())) { - return file; - } + // If the file is already a hardlink or copied file, + // just increase reference count and return it + if (increaseReferenceIfExists(file)) { + return file; + } - // If the file is not a hardlink or copied file, check if there is a related hardlink or - // copied file in pipe dir. if so, increase reference count and return it - final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file); - if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) { + // If the file is not a hardlink or copied file, check if there is a related hardlink or + // copied file in pipe dir. if so, increase reference count and return it + final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file); + segmentLock.lock(hardlinkOrCopiedFile); + try { + if (increaseReferenceIfExists(hardlinkOrCopiedFile)) { return hardlinkOrCopiedFileToPipeTsFileResourceMap .get(hardlinkOrCopiedFile.getPath()) .getFile(); } + } finally { + segmentLock.unlock(hardlinkOrCopiedFile); + } - // If the file is a tsfile, create a hardlink in pipe dir and will return it. - // otherwise, copy the file (.mod or .resource) to pipe dir and will return it. - final File resultFile = - isTsFile - ? FileUtils.createHardLink(file, hardlinkOrCopiedFile) - : FileUtils.copyFile(file, hardlinkOrCopiedFile); + // If the file is a tsfile, create a hardlink in pipe dir and will return it. + // otherwise, copy the file (.mod or .resource) to pipe dir and will return it. + final File resultFile = + isTsFile + ? FileUtils.createHardLink(file, hardlinkOrCopiedFile) + : FileUtils.copyFile(file, hardlinkOrCopiedFile); + segmentLock.lock(resultFile); + try { // If the file is not a hardlink or copied file, and there is no related hardlink or copied // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a reference count for // the hardlink or copied file, and return the hardlink or copied file. @@ -162,17 +166,23 @@ public class PipeTsFileResourceManager { resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile, tsFileResource)); return resultFile; } finally { - lock.unlock(); + segmentLock.unlock(resultFile); } } - private boolean increaseReferenceIfExists(final String path) { - final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path); - if (resource != null) { - resource.increaseAndGetReference(); - return true; + private boolean increaseReferenceIfExists(final File file) { + segmentLock.lock(file); + try { + final PipeTsFileResource resource = + hardlinkOrCopiedFileToPipeTsFileResourceMap.get(file.getPath()); + if (resource != null) { + resource.increaseAndGetReference(); + return true; + } + return false; + } finally { + segmentLock.unlock(file); } - return false; } public static File getHardlinkOrCopiedFileInPipeDir(final File file) throws IOException { @@ -220,7 +230,7 @@ public class PipeTsFileResourceManager { * @param hardlinkOrCopiedFile the copied or hardlinked file */ public void decreaseFileReference(final File hardlinkOrCopiedFile) { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedFile); try { final String filePath = hardlinkOrCopiedFile.getPath(); final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath); @@ -228,7 +238,7 @@ public class PipeTsFileResourceManager { resource.decreaseAndGetReference(); } } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedFile); } } @@ -239,13 +249,13 @@ public class PipeTsFileResourceManager { * @return the reference count of the file */ public int getFileReferenceCount(final File hardlinkOrCopiedFile) { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedFile); try { final String filePath = hardlinkOrCopiedFile.getPath(); final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath); return resource != null ? resource.getReferenceCount() : 0; } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedFile); } } @@ -256,94 +266,78 @@ public class PipeTsFileResourceManager { * false} if they can not be cached. */ public boolean cacheObjectsIfAbsent(final File hardlinkOrCopiedTsFile) throws IOException { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedTsFile); try { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath()); return resource != null && resource.cacheObjectsIfAbsent(); } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedTsFile); } } public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache( final File hardlinkOrCopiedTsFile) throws IOException { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedTsFile); try { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath()); return resource == null ? null : resource.tryGetDeviceMeasurementsMap(); } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedTsFile); } } public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache( final File hardlinkOrCopiedTsFile, final boolean cacheOtherMetadata) throws IOException { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedTsFile); try { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath()); return resource == null ? null : resource.tryGetDeviceIsAlignedMap(cacheOtherMetadata); } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedTsFile); } } public Map<String, TSDataType> getMeasurementDataTypeMapFromCache( final File hardlinkOrCopiedTsFile) throws IOException { - lock.lock(); + segmentLock.lock(hardlinkOrCopiedTsFile); try { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath()); return resource == null ? null : resource.tryGetMeasurementDataTypeMap(); } finally { - lock.unlock(); + segmentLock.unlock(hardlinkOrCopiedTsFile); } } public void pinTsFileResource(final TsFileResource resource, final boolean withMods) throws IOException { - lock.lock(); - try { - increaseFileReference(resource.getTsFile(), true, resource); - if (withMods && resource.getModFile().exists()) { - increaseFileReference(new File(resource.getModFile().getFilePath()), false, null); - } - } finally { - lock.unlock(); + increaseFileReference(resource.getTsFile(), true, resource); + if (withMods && resource.getModFile().exists()) { + increaseFileReference(new File(resource.getModFile().getFilePath()), false, null); } } public void unpinTsFileResource(final TsFileResource resource) throws IOException { - lock.lock(); - try { - final File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()); - decreaseFileReference(pinnedFile); + final File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()); + decreaseFileReference(pinnedFile); - final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX); - if (modFile.exists()) { - decreaseFileReference(modFile); - } - } finally { - lock.unlock(); + final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX); + if (modFile.exists()) { + decreaseFileReference(modFile); } } public int getLinkedTsfileCount() { - lock.lock(); - try { - return hardlinkOrCopiedFileToPipeTsFileResourceMap.size(); - } finally { - lock.unlock(); - } + return hardlinkOrCopiedFileToPipeTsFileResourceMap.size(); } /** * Get the total size of linked TsFiles whose original TsFile is deleted (by compaction or else) */ public long getTotalLinkedButDeletedTsfileSize() { - lock.lock(); try { return hardlinkOrCopiedFileToPipeTsFileResourceMap.values().parallelStream() .filter(PipeTsFileResource::isOriginalTsFileDeleted) @@ -358,8 +352,9 @@ public class PipeTsFileResourceManager { } }) .sum(); - } finally { - lock.unlock(); + } catch (final Exception e) { + LOGGER.warn("failed to get total size of linked but deleted TsFiles: ", e); + return 0; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java new file mode 100644 index 00000000000..cc73ee0db38 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.tsfile; + +import org.apache.iotdb.db.storageengine.StorageEngine; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class PipeTsFileResourceSegmentLock { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileResourceSegmentLock.class); + + private static final int SEGMENT_LOCK_MIN_SIZE = 32; + private static final int SEGMENT_LOCK_MAX_SIZE = 128; + + private volatile ReentrantLock[] locks; + + private void initIfNecessary() { + if (locks == null) { + synchronized (this) { + if (locks == null) { + int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE; + try { + lockSegmentSize = StorageEngine.getInstance().getAllDataRegionIds().size(); + } catch (final Exception e) { + LOGGER.warn( + "Cannot get data region ids, use default lock segment size: {}", lockSegmentSize); + } + lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize); + lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize); + + locks = new ReentrantLock[lockSegmentSize]; + for (int i = 0; i < locks.length; i++) { + locks[i] = new ReentrantLock(); + } + } + } + } + } + + public void lock(final File file) { + initIfNecessary(); + locks[Math.abs(file.hashCode()) % locks.length].lock(); + } + + public boolean tryLock(final File file, final long timeout, final TimeUnit timeUnit) + throws InterruptedException { + initIfNecessary(); + return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, timeUnit); + } + + public boolean tryLockAll(final long timeout, final TimeUnit timeUnit) + throws InterruptedException { + initIfNecessary(); + int alreadyLocked = 0; + for (final ReentrantLock lock : locks) { + if (lock.tryLock(timeout, timeUnit)) { + alreadyLocked++; + } else { + break; + } + } + + if (alreadyLocked == locks.length) { + return true; + } else { + unlockUntil(alreadyLocked); + return false; + } + } + + private void unlockUntil(final int index) { + for (int i = 0; i < index; i++) { + locks[i].unlock(); + } + } + + public void unlock(final File file) { + initIfNecessary(); + locks[Math.abs(file.hashCode()) % locks.length].unlock(); + } + + public void unlockAll() { + initIfNecessary(); + unlockUntil(locks.length); + } +}
