This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new d218f1edffe Pipe: Optimize lock in PipeTsFileResourceManager using
segment lock strategy (#13915) (#13994)
d218f1edffe is described below
commit d218f1edffe29c1ea7424e66505c3886ba0231b7
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Nov 5 18:21:06 2024 +0800
Pipe: Optimize lock in PipeTsFileResourceManager using segment lock
strategy (#13915) (#13994)
(cherry picked from commit b8432f88fcd6fad8b56d6e95b86f17a5b8031824)
---
.../resource/tsfile/PipeTsFileResourceManager.java | 123 ++++++++++-----------
.../tsfile/PipeTsFileResourceSegmentLock.java | 109 ++++++++++++++++++
2 files changed, 168 insertions(+), 64 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index e7367956126..d3482653f85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -41,7 +40,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
public class PipeTsFileResourceManager {
@@ -49,7 +47,7 @@ public class PipeTsFileResourceManager {
private final Map<String, PipeTsFileResource>
hardlinkOrCopiedFileToPipeTsFileResourceMap =
new ConcurrentHashMap<>();
- private final ReentrantLock lock = new ReentrantLock();
+ private final PipeTsFileResourceSegmentLock segmentLock = new
PipeTsFileResourceSegmentLock();
public PipeTsFileResourceManager() {
PipeDataNodeAgent.runtime()
@@ -61,25 +59,20 @@ public class PipeTsFileResourceManager {
private void tryTtlCheck() {
try {
- final long timeout = PipePeriodicalJobExecutor.getMinIntervalSeconds()
>> 1;
- if (lock.tryLock(timeout, TimeUnit.SECONDS)) {
- try {
- ttlCheck();
- } finally {
- lock.unlock();
- }
- } else {
- LOGGER.warn("failed to try lock when checking TTL because of timeout
({}s)", timeout);
- }
+ ttlCheck();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("failed to try lock when checking TTL because of
interruption", e);
+ } catch (final Exception e) {
+ LOGGER.warn("failed to check TTL of PipeTsFileResource: ", e);
}
}
- private void ttlCheck() {
+ private void ttlCheck() throws InterruptedException {
final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
+ final long timeout =
+
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()
>> 1;
final Optional<Logger> logger =
PipeDataNodeResourceManager.log()
.schedule(
@@ -91,6 +84,15 @@ public class PipeTsFileResourceManager {
while (iterator.hasNext()) {
final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
+ final String hardlinkOrCopiedFile = entry.getKey();
+ if (!segmentLock.tryLock(new File(hardlinkOrCopiedFile), timeout,
TimeUnit.SECONDS)) {
+ LOGGER.warn(
+ "failed to try lock when checking TTL for file {} because of
timeout ({}s)",
+ hardlinkOrCopiedFile,
+ timeout);
+ continue;
+ }
+
try {
if (entry.getValue().closeIfOutOfTimeToLive()) {
iterator.remove();
@@ -104,6 +106,8 @@ public class PipeTsFileResourceManager {
}
} catch (final IOException e) {
LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ",
e);
+ } finally {
+ segmentLock.unlock(new File(hardlinkOrCopiedFile));
}
}
}
@@ -132,18 +136,23 @@ public class PipeTsFileResourceManager {
public File increaseFileReference(
final File file, final boolean isTsFile, final TsFileResource
tsFileResource)
throws IOException {
- lock.lock();
+ // If the file is already a hardlink or copied file,
+ // just increase reference count and return it
+ segmentLock.lock(file);
try {
- // If the file is already a hardlink or copied file,
- // just increase reference count and return it
- if (increaseReferenceIfExists(file.getPath())) {
+ if (increaseReferenceIfExists(file)) {
return file;
}
+ } finally {
+ segmentLock.unlock(file);
+ }
- // If the file is not a hardlink or copied file, check if there is a
related hardlink or
- // copied file in pipe dir. if so, increase reference count and return it
- final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
- if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
+ // If the file is not a hardlink or copied file, check if there is a
related hardlink or
+ // copied file in pipe dir. if so, increase reference count and return it
+ final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
+ segmentLock.lock(hardlinkOrCopiedFile);
+ try {
+ if (increaseReferenceIfExists(hardlinkOrCopiedFile)) {
return hardlinkOrCopiedFileToPipeTsFileResourceMap
.get(hardlinkOrCopiedFile.getPath())
.getFile();
@@ -162,12 +171,13 @@ public class PipeTsFileResourceManager {
resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile,
tsFileResource));
return resultFile;
} finally {
- lock.unlock();
+ segmentLock.unlock(hardlinkOrCopiedFile);
}
}
- private boolean increaseReferenceIfExists(final String path) {
- final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path);
+ private boolean increaseReferenceIfExists(final File file) {
+ final PipeTsFileResource resource =
+ hardlinkOrCopiedFileToPipeTsFileResourceMap.get(file.getPath());
if (resource != null) {
resource.increaseAndGetReference();
return true;
@@ -220,7 +230,7 @@ public class PipeTsFileResourceManager {
* @param hardlinkOrCopiedFile the copied or hardlinked file
*/
public void decreaseFileReference(final File hardlinkOrCopiedFile) {
- lock.lock();
+ segmentLock.lock(hardlinkOrCopiedFile);
try {
final String filePath = hardlinkOrCopiedFile.getPath();
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
@@ -228,7 +238,7 @@ public class PipeTsFileResourceManager {
resource.decreaseAndGetReference();
}
} finally {
- lock.unlock();
+ segmentLock.unlock(hardlinkOrCopiedFile);
}
}
@@ -239,13 +249,13 @@ public class PipeTsFileResourceManager {
* @return the reference count of the file
*/
public int getFileReferenceCount(final File hardlinkOrCopiedFile) {
- lock.lock();
+ segmentLock.lock(hardlinkOrCopiedFile);
try {
final String filePath = hardlinkOrCopiedFile.getPath();
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
return resource != null ? resource.getReferenceCount() : 0;
} finally {
- lock.unlock();
+ segmentLock.unlock(hardlinkOrCopiedFile);
}
}
@@ -256,94 +266,78 @@ public class PipeTsFileResourceManager {
* false} if they can not be cached.
*/
public boolean cacheObjectsIfAbsent(final File hardlinkOrCopiedTsFile)
throws IOException {
- lock.lock();
+ segmentLock.lock(hardlinkOrCopiedTsFile);
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
return resource != null && resource.cacheObjectsIfAbsent();
} finally {
- lock.unlock();
+ segmentLock.unlock(hardlinkOrCopiedTsFile);
}
}
public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache(
final File hardlinkOrCopiedTsFile) throws IOException {
- lock.lock();
+ segmentLock.lock(hardlinkOrCopiedTsFile);
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
return resource == null ? null : resource.tryGetDeviceMeasurementsMap();
} finally {
- lock.unlock();
+ segmentLock.unlock(hardlinkOrCopiedTsFile);
}
}
public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(
final File hardlinkOrCopiedTsFile, final boolean cacheOtherMetadata)
throws IOException {
- lock.lock();
+ segmentLock.lock(hardlinkOrCopiedTsFile);
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
return resource == null ? null :
resource.tryGetDeviceIsAlignedMap(cacheOtherMetadata);
} finally {
- lock.unlock();
+ segmentLock.unlock(hardlinkOrCopiedTsFile);
}
}
public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(
final File hardlinkOrCopiedTsFile) throws IOException {
- lock.lock();
+ segmentLock.lock(hardlinkOrCopiedTsFile);
try {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
return resource == null ? null : resource.tryGetMeasurementDataTypeMap();
} finally {
- lock.unlock();
+ segmentLock.unlock(hardlinkOrCopiedTsFile);
}
}
public void pinTsFileResource(final TsFileResource resource, final boolean
withMods)
throws IOException {
- lock.lock();
- try {
- increaseFileReference(resource.getTsFile(), true, resource);
- if (withMods && resource.getModFile().exists()) {
- increaseFileReference(new File(resource.getModFile().getFilePath()),
false, null);
- }
- } finally {
- lock.unlock();
+ increaseFileReference(resource.getTsFile(), true, resource);
+ if (withMods && resource.getModFile().exists()) {
+ increaseFileReference(new File(resource.getModFile().getFilePath()),
false, null);
}
}
public void unpinTsFileResource(final TsFileResource resource) throws
IOException {
- lock.lock();
- try {
- final File pinnedFile =
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
- decreaseFileReference(pinnedFile);
+ final File pinnedFile =
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
+ decreaseFileReference(pinnedFile);
- final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
- if (modFile.exists()) {
- decreaseFileReference(modFile);
- }
- } finally {
- lock.unlock();
+ final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
+ if (modFile.exists()) {
+ decreaseFileReference(modFile);
}
}
public int getLinkedTsfileCount() {
- lock.lock();
- try {
- return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
- } finally {
- lock.unlock();
- }
+ return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
}
/**
* Get the total size of linked TsFiles whose original TsFile is deleted (by
compaction or else)
*/
public long getTotalLinkedButDeletedTsfileSize() {
- lock.lock();
try {
return
hardlinkOrCopiedFileToPipeTsFileResourceMap.values().parallelStream()
.filter(PipeTsFileResource::isOriginalTsFileDeleted)
@@ -358,8 +352,9 @@ public class PipeTsFileResourceManager {
}
})
.sum();
- } finally {
- lock.unlock();
+ } catch (final Exception e) {
+ LOGGER.warn("failed to get total size of linked but deleted TsFiles: ",
e);
+ return 0;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
new file mode 100644
index 00000000000..cc73ee0db38
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.tsfile;
+
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeTsFileResourceSegmentLock {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileResourceSegmentLock.class);
+
+ private static final int SEGMENT_LOCK_MIN_SIZE = 32;
+ private static final int SEGMENT_LOCK_MAX_SIZE = 128;
+
+ private volatile ReentrantLock[] locks;
+
+ private void initIfNecessary() {
+ if (locks == null) {
+ synchronized (this) {
+ if (locks == null) {
+ int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
+ try {
+ lockSegmentSize =
StorageEngine.getInstance().getAllDataRegionIds().size();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Cannot get data region ids, use default lock segment size:
{}", lockSegmentSize);
+ }
+ lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize);
+ lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize);
+
+ locks = new ReentrantLock[lockSegmentSize];
+ for (int i = 0; i < locks.length; i++) {
+ locks[i] = new ReentrantLock();
+ }
+ }
+ }
+ }
+ }
+
+ public void lock(final File file) {
+ initIfNecessary();
+ locks[Math.abs(file.hashCode()) % locks.length].lock();
+ }
+
+ public boolean tryLock(final File file, final long timeout, final TimeUnit
timeUnit)
+ throws InterruptedException {
+ initIfNecessary();
+ return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout,
timeUnit);
+ }
+
+ public boolean tryLockAll(final long timeout, final TimeUnit timeUnit)
+ throws InterruptedException {
+ initIfNecessary();
+ int alreadyLocked = 0;
+ for (final ReentrantLock lock : locks) {
+ if (lock.tryLock(timeout, timeUnit)) {
+ alreadyLocked++;
+ } else {
+ break;
+ }
+ }
+
+ if (alreadyLocked == locks.length) {
+ return true;
+ } else {
+ unlockUntil(alreadyLocked);
+ return false;
+ }
+ }
+
+ private void unlockUntil(final int index) {
+ for (int i = 0; i < index; i++) {
+ locks[i].unlock();
+ }
+ }
+
+ public void unlock(final File file) {
+ initIfNecessary();
+ locks[Math.abs(file.hashCode()) % locks.length].unlock();
+ }
+
+ public void unlockAll() {
+ initIfNecessary();
+ unlockUntil(locks.length);
+ }
+}