This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch seg-lock-for-pipe-resource
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b5f953c1dea096a61be333af43ab334e0fa7e7e4
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Oct 25 14:17:16 2024 +0800

    Pipe: Optimize lock in PipeTsFileResourceManager using segment lock strategy
---
 .../resource/tsfile/PipeTsFileResourceManager.java | 129 ++++++++++-----------
 .../tsfile/PipeTsFileResourceSegmentLock.java      | 109 +++++++++++++++++
 2 files changed, 171 insertions(+), 67 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index bb8bc948de8..dfdf6ddc2fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -40,7 +40,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 public class PipeTsFileResourceManager {
 
@@ -48,7 +47,7 @@ public class PipeTsFileResourceManager {
 
   private final Map<String, PipeTsFileResource> 
hardlinkOrCopiedFileToPipeTsFileResourceMap =
       new ConcurrentHashMap<>();
-  private final ReentrantLock lock = new ReentrantLock();
+  private final PipeTsFileResourceSegmentLock segmentLock = new 
PipeTsFileResourceSegmentLock();
 
   public PipeTsFileResourceManager() {
     PipeDataNodeAgent.runtime()
@@ -62,11 +61,11 @@ public class PipeTsFileResourceManager {
     try {
       final long timeout =
           
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()
 >> 1;
-      if (lock.tryLock(timeout, TimeUnit.SECONDS)) {
+      if (segmentLock.tryLockAll(timeout, TimeUnit.SECONDS)) {
         try {
           ttlCheck();
         } finally {
-          lock.unlock();
+          segmentLock.unlockAll();
         }
       } else {
         LOGGER.warn("failed to try lock when checking TTL because of timeout 
({}s)", timeout);
@@ -132,29 +131,34 @@ public class PipeTsFileResourceManager {
   public File increaseFileReference(
       final File file, final boolean isTsFile, final TsFileResource 
tsFileResource)
       throws IOException {
-    lock.lock();
-    try {
-      // If the file is already a hardlink or copied file,
-      // just increase reference count and return it
-      if (increaseReferenceIfExists(file.getPath())) {
-        return file;
-      }
+    // If the file is already a hardlink or copied file,
+    // just increase reference count and return it
+    if (increaseReferenceIfExists(file)) {
+      return file;
+    }
 
-      // If the file is not a hardlink or copied file, check if there is a 
related hardlink or
-      // copied file in pipe dir. if so, increase reference count and return it
-      final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
-      if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
+    // If the file is not a hardlink or copied file, check if there is a 
related hardlink or
+    // copied file in pipe dir. if so, increase reference count and return it
+    final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
+    segmentLock.lock(hardlinkOrCopiedFile);
+    try {
+      if (increaseReferenceIfExists(hardlinkOrCopiedFile)) {
         return hardlinkOrCopiedFileToPipeTsFileResourceMap
             .get(hardlinkOrCopiedFile.getPath())
             .getFile();
       }
+    } finally {
+      segmentLock.unlock(hardlinkOrCopiedFile);
+    }
 
-      // If the file is a tsfile, create a hardlink in pipe dir and will 
return it.
-      // otherwise, copy the file (.mod or .resource) to pipe dir and will 
return it.
-      final File resultFile =
-          isTsFile
-              ? FileUtils.createHardLink(file, hardlinkOrCopiedFile)
-              : FileUtils.copyFile(file, hardlinkOrCopiedFile);
+    // If the file is a tsfile, create a hardlink in pipe dir and will return 
it.
+    // otherwise, copy the file (.mod or .resource) to pipe dir and will 
return it.
+    final File resultFile =
+        isTsFile
+            ? FileUtils.createHardLink(file, hardlinkOrCopiedFile)
+            : FileUtils.copyFile(file, hardlinkOrCopiedFile);
+    segmentLock.lock(resultFile);
+    try {
       // If the file is not a hardlink or copied file, and there is no related 
hardlink or copied
       // file in pipe dir, create a hardlink or copy it to pipe dir, maintain 
a reference count for
       // the hardlink or copied file, and return the hardlink or copied file.
@@ -162,17 +166,23 @@ public class PipeTsFileResourceManager {
           resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile, 
tsFileResource));
       return resultFile;
     } finally {
-      lock.unlock();
+      segmentLock.unlock(resultFile);
     }
   }
 
-  private boolean increaseReferenceIfExists(final String path) {
-    final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path);
-    if (resource != null) {
-      resource.increaseAndGetReference();
-      return true;
+  private boolean increaseReferenceIfExists(final File file) {
+    segmentLock.lock(file);
+    try {
+      final PipeTsFileResource resource =
+          hardlinkOrCopiedFileToPipeTsFileResourceMap.get(file.getPath());
+      if (resource != null) {
+        resource.increaseAndGetReference();
+        return true;
+      }
+      return false;
+    } finally {
+      segmentLock.unlock(file);
     }
-    return false;
   }
 
   public static File getHardlinkOrCopiedFileInPipeDir(final File file) throws 
IOException {
@@ -220,7 +230,7 @@ public class PipeTsFileResourceManager {
    * @param hardlinkOrCopiedFile the copied or hardlinked file
    */
   public void decreaseFileReference(final File hardlinkOrCopiedFile) {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedFile);
     try {
       final String filePath = hardlinkOrCopiedFile.getPath();
       final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
@@ -228,7 +238,7 @@ public class PipeTsFileResourceManager {
         resource.decreaseAndGetReference();
       }
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedFile);
     }
   }
 
@@ -239,13 +249,13 @@ public class PipeTsFileResourceManager {
    * @return the reference count of the file
    */
   public int getFileReferenceCount(final File hardlinkOrCopiedFile) {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedFile);
     try {
       final String filePath = hardlinkOrCopiedFile.getPath();
       final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
       return resource != null ? resource.getReferenceCount() : 0;
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedFile);
     }
   }
 
@@ -256,94 +266,78 @@ public class PipeTsFileResourceManager {
    *     false} if they can not be cached.
    */
   public boolean cacheObjectsIfAbsent(final File hardlinkOrCopiedTsFile) 
throws IOException {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedTsFile);
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
       return resource != null && resource.cacheObjectsIfAbsent();
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedTsFile);
     }
   }
 
   public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache(
       final File hardlinkOrCopiedTsFile) throws IOException {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedTsFile);
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
       return resource == null ? null : resource.tryGetDeviceMeasurementsMap();
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedTsFile);
     }
   }
 
   public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(
       final File hardlinkOrCopiedTsFile, final boolean cacheOtherMetadata) 
throws IOException {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedTsFile);
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
       return resource == null ? null : 
resource.tryGetDeviceIsAlignedMap(cacheOtherMetadata);
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedTsFile);
     }
   }
 
   public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(
       final File hardlinkOrCopiedTsFile) throws IOException {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedTsFile);
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
       return resource == null ? null : resource.tryGetMeasurementDataTypeMap();
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedTsFile);
     }
   }
 
   public void pinTsFileResource(final TsFileResource resource, final boolean 
withMods)
       throws IOException {
-    lock.lock();
-    try {
-      increaseFileReference(resource.getTsFile(), true, resource);
-      if (withMods && resource.getModFile().exists()) {
-        increaseFileReference(new File(resource.getModFile().getFilePath()), 
false, null);
-      }
-    } finally {
-      lock.unlock();
+    increaseFileReference(resource.getTsFile(), true, resource);
+    if (withMods && resource.getModFile().exists()) {
+      increaseFileReference(new File(resource.getModFile().getFilePath()), 
false, null);
     }
   }
 
   public void unpinTsFileResource(final TsFileResource resource) throws 
IOException {
-    lock.lock();
-    try {
-      final File pinnedFile = 
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
-      decreaseFileReference(pinnedFile);
+    final File pinnedFile = 
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
+    decreaseFileReference(pinnedFile);
 
-      final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
-      if (modFile.exists()) {
-        decreaseFileReference(modFile);
-      }
-    } finally {
-      lock.unlock();
+    final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
+    if (modFile.exists()) {
+      decreaseFileReference(modFile);
     }
   }
 
   public int getLinkedTsfileCount() {
-    lock.lock();
-    try {
-      return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
-    } finally {
-      lock.unlock();
-    }
+    return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
   }
 
   /**
    * Get the total size of linked TsFiles whose original TsFile is deleted (by 
compaction or else)
    */
   public long getTotalLinkedButDeletedTsfileSize() {
-    lock.lock();
     try {
       return 
hardlinkOrCopiedFileToPipeTsFileResourceMap.values().parallelStream()
           .filter(PipeTsFileResource::isOriginalTsFileDeleted)
@@ -358,8 +352,9 @@ public class PipeTsFileResourceManager {
                 }
               })
           .sum();
-    } finally {
-      lock.unlock();
+    } catch (final Exception e) {
+      LOGGER.warn("failed to get total size of linked but deleted TsFiles: ", 
e);
+      return 0;
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
new file mode 100644
index 00000000000..cc73ee0db38
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.tsfile;
+
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeTsFileResourceSegmentLock {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResourceSegmentLock.class);
+
+  private static final int SEGMENT_LOCK_MIN_SIZE = 32;
+  private static final int SEGMENT_LOCK_MAX_SIZE = 128;
+
+  private volatile ReentrantLock[] locks;
+
+  private void initIfNecessary() {
+    if (locks == null) {
+      synchronized (this) {
+        if (locks == null) {
+          int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
+          try {
+            lockSegmentSize = 
StorageEngine.getInstance().getAllDataRegionIds().size();
+          } catch (final Exception e) {
+            LOGGER.warn(
+                "Cannot get data region ids, use default lock segment size: 
{}", lockSegmentSize);
+          }
+          lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize);
+          lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize);
+
+          locks = new ReentrantLock[lockSegmentSize];
+          for (int i = 0; i < locks.length; i++) {
+            locks[i] = new ReentrantLock();
+          }
+        }
+      }
+    }
+  }
+
+  public void lock(final File file) {
+    initIfNecessary();
+    locks[Math.abs(file.hashCode()) % locks.length].lock();
+  }
+
+  public boolean tryLock(final File file, final long timeout, final TimeUnit 
timeUnit)
+      throws InterruptedException {
+    initIfNecessary();
+    return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, 
timeUnit);
+  }
+
+  public boolean tryLockAll(final long timeout, final TimeUnit timeUnit)
+      throws InterruptedException {
+    initIfNecessary();
+    int alreadyLocked = 0;
+    for (final ReentrantLock lock : locks) {
+      if (lock.tryLock(timeout, timeUnit)) {
+        alreadyLocked++;
+      } else {
+        break;
+      }
+    }
+
+    if (alreadyLocked == locks.length) {
+      return true;
+    } else {
+      unlockUntil(alreadyLocked);
+      return false;
+    }
+  }
+
+  private void unlockUntil(final int index) {
+    for (int i = 0; i < index; i++) {
+      locks[i].unlock();
+    }
+  }
+
+  public void unlock(final File file) {
+    initIfNecessary();
+    locks[Math.abs(file.hashCode()) % locks.length].unlock();
+  }
+
+  public void unlockAll() {
+    initIfNecessary();
+    unlockUntil(locks.length);
+  }
+}

Reply via email to