This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new d218f1edffe Pipe: Optimize lock in PipeTsFileResourceManager using 
segment lock strategy (#13915) (#13994)
d218f1edffe is described below

commit d218f1edffe29c1ea7424e66505c3886ba0231b7
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Nov 5 18:21:06 2024 +0800

    Pipe: Optimize lock in PipeTsFileResourceManager using segment lock 
strategy (#13915) (#13994)
    
    (cherry picked from commit b8432f88fcd6fad8b56d6e95b86f17a5b8031824)
---
 .../resource/tsfile/PipeTsFileResourceManager.java | 123 ++++++++++-----------
 .../tsfile/PipeTsFileResourceSegmentLock.java      | 109 ++++++++++++++++++
 2 files changed, 168 insertions(+), 64 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index e7367956126..d3482653f85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -41,7 +40,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 public class PipeTsFileResourceManager {
 
@@ -49,7 +47,7 @@ public class PipeTsFileResourceManager {
 
   private final Map<String, PipeTsFileResource> 
hardlinkOrCopiedFileToPipeTsFileResourceMap =
       new ConcurrentHashMap<>();
-  private final ReentrantLock lock = new ReentrantLock();
+  private final PipeTsFileResourceSegmentLock segmentLock = new 
PipeTsFileResourceSegmentLock();
 
   public PipeTsFileResourceManager() {
     PipeDataNodeAgent.runtime()
@@ -61,25 +59,20 @@ public class PipeTsFileResourceManager {
 
   private void tryTtlCheck() {
     try {
-      final long timeout = PipePeriodicalJobExecutor.getMinIntervalSeconds() 
>> 1;
-      if (lock.tryLock(timeout, TimeUnit.SECONDS)) {
-        try {
-          ttlCheck();
-        } finally {
-          lock.unlock();
-        }
-      } else {
-        LOGGER.warn("failed to try lock when checking TTL because of timeout 
({}s)", timeout);
-      }
+      ttlCheck();
     } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.warn("failed to try lock when checking TTL because of 
interruption", e);
+    } catch (final Exception e) {
+      LOGGER.warn("failed to check TTL of PipeTsFileResource: ", e);
     }
   }
 
-  private void ttlCheck() {
+  private void ttlCheck() throws InterruptedException {
     final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
         hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
+    final long timeout =
+        
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()
 >> 1;
     final Optional<Logger> logger =
         PipeDataNodeResourceManager.log()
             .schedule(
@@ -91,6 +84,15 @@ public class PipeTsFileResourceManager {
     while (iterator.hasNext()) {
       final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
 
+      final String hardlinkOrCopiedFile = entry.getKey();
+      if (!segmentLock.tryLock(new File(hardlinkOrCopiedFile), timeout, 
TimeUnit.SECONDS)) {
+        LOGGER.warn(
+            "failed to try lock when checking TTL for file {} because of 
timeout ({}s)",
+            hardlinkOrCopiedFile,
+            timeout);
+        continue;
+      }
+
       try {
         if (entry.getValue().closeIfOutOfTimeToLive()) {
           iterator.remove();
@@ -104,6 +106,8 @@ public class PipeTsFileResourceManager {
         }
       } catch (final IOException e) {
         LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", 
e);
+      } finally {
+        segmentLock.unlock(new File(hardlinkOrCopiedFile));
       }
     }
   }
@@ -132,18 +136,23 @@ public class PipeTsFileResourceManager {
   public File increaseFileReference(
       final File file, final boolean isTsFile, final TsFileResource 
tsFileResource)
       throws IOException {
-    lock.lock();
+    // If the file is already a hardlink or copied file,
+    // just increase reference count and return it
+    segmentLock.lock(file);
     try {
-      // If the file is already a hardlink or copied file,
-      // just increase reference count and return it
-      if (increaseReferenceIfExists(file.getPath())) {
+      if (increaseReferenceIfExists(file)) {
         return file;
       }
+    } finally {
+      segmentLock.unlock(file);
+    }
 
-      // If the file is not a hardlink or copied file, check if there is a 
related hardlink or
-      // copied file in pipe dir. if so, increase reference count and return it
-      final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
-      if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
+    // If the file is not a hardlink or copied file, check if there is a 
related hardlink or
+    // copied file in pipe dir. if so, increase reference count and return it
+    final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
+    segmentLock.lock(hardlinkOrCopiedFile);
+    try {
+      if (increaseReferenceIfExists(hardlinkOrCopiedFile)) {
         return hardlinkOrCopiedFileToPipeTsFileResourceMap
             .get(hardlinkOrCopiedFile.getPath())
             .getFile();
@@ -162,12 +171,13 @@ public class PipeTsFileResourceManager {
           resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile, 
tsFileResource));
       return resultFile;
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedFile);
     }
   }
 
-  private boolean increaseReferenceIfExists(final String path) {
-    final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path);
+  private boolean increaseReferenceIfExists(final File file) {
+    final PipeTsFileResource resource =
+        hardlinkOrCopiedFileToPipeTsFileResourceMap.get(file.getPath());
     if (resource != null) {
       resource.increaseAndGetReference();
       return true;
@@ -220,7 +230,7 @@ public class PipeTsFileResourceManager {
    * @param hardlinkOrCopiedFile the copied or hardlinked file
    */
   public void decreaseFileReference(final File hardlinkOrCopiedFile) {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedFile);
     try {
       final String filePath = hardlinkOrCopiedFile.getPath();
       final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
@@ -228,7 +238,7 @@ public class PipeTsFileResourceManager {
         resource.decreaseAndGetReference();
       }
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedFile);
     }
   }
 
@@ -239,13 +249,13 @@ public class PipeTsFileResourceManager {
    * @return the reference count of the file
    */
   public int getFileReferenceCount(final File hardlinkOrCopiedFile) {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedFile);
     try {
       final String filePath = hardlinkOrCopiedFile.getPath();
       final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
       return resource != null ? resource.getReferenceCount() : 0;
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedFile);
     }
   }
 
@@ -256,94 +266,78 @@ public class PipeTsFileResourceManager {
    *     false} if they can not be cached.
    */
   public boolean cacheObjectsIfAbsent(final File hardlinkOrCopiedTsFile) 
throws IOException {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedTsFile);
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
       return resource != null && resource.cacheObjectsIfAbsent();
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedTsFile);
     }
   }
 
   public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache(
       final File hardlinkOrCopiedTsFile) throws IOException {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedTsFile);
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
       return resource == null ? null : resource.tryGetDeviceMeasurementsMap();
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedTsFile);
     }
   }
 
   public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(
       final File hardlinkOrCopiedTsFile, final boolean cacheOtherMetadata) 
throws IOException {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedTsFile);
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
       return resource == null ? null : 
resource.tryGetDeviceIsAlignedMap(cacheOtherMetadata);
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedTsFile);
     }
   }
 
   public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(
       final File hardlinkOrCopiedTsFile) throws IOException {
-    lock.lock();
+    segmentLock.lock(hardlinkOrCopiedTsFile);
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
       return resource == null ? null : resource.tryGetMeasurementDataTypeMap();
     } finally {
-      lock.unlock();
+      segmentLock.unlock(hardlinkOrCopiedTsFile);
     }
   }
 
   public void pinTsFileResource(final TsFileResource resource, final boolean 
withMods)
       throws IOException {
-    lock.lock();
-    try {
-      increaseFileReference(resource.getTsFile(), true, resource);
-      if (withMods && resource.getModFile().exists()) {
-        increaseFileReference(new File(resource.getModFile().getFilePath()), 
false, null);
-      }
-    } finally {
-      lock.unlock();
+    increaseFileReference(resource.getTsFile(), true, resource);
+    if (withMods && resource.getModFile().exists()) {
+      increaseFileReference(new File(resource.getModFile().getFilePath()), 
false, null);
     }
   }
 
   public void unpinTsFileResource(final TsFileResource resource) throws 
IOException {
-    lock.lock();
-    try {
-      final File pinnedFile = 
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
-      decreaseFileReference(pinnedFile);
+    final File pinnedFile = 
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
+    decreaseFileReference(pinnedFile);
 
-      final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
-      if (modFile.exists()) {
-        decreaseFileReference(modFile);
-      }
-    } finally {
-      lock.unlock();
+    final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
+    if (modFile.exists()) {
+      decreaseFileReference(modFile);
     }
   }
 
   public int getLinkedTsfileCount() {
-    lock.lock();
-    try {
-      return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
-    } finally {
-      lock.unlock();
-    }
+    return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
   }
 
   /**
    * Get the total size of linked TsFiles whose original TsFile is deleted (by 
compaction or else)
    */
   public long getTotalLinkedButDeletedTsfileSize() {
-    lock.lock();
     try {
       return 
hardlinkOrCopiedFileToPipeTsFileResourceMap.values().parallelStream()
           .filter(PipeTsFileResource::isOriginalTsFileDeleted)
@@ -358,8 +352,9 @@ public class PipeTsFileResourceManager {
                 }
               })
           .sum();
-    } finally {
-      lock.unlock();
+    } catch (final Exception e) {
+      LOGGER.warn("failed to get total size of linked but deleted TsFiles: ", 
e);
+      return 0;
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
new file mode 100644
index 00000000000..cc73ee0db38
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.tsfile;
+
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeTsFileResourceSegmentLock {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResourceSegmentLock.class);
+
+  private static final int SEGMENT_LOCK_MIN_SIZE = 32;
+  private static final int SEGMENT_LOCK_MAX_SIZE = 128;
+
+  private volatile ReentrantLock[] locks;
+
+  private void initIfNecessary() {
+    if (locks == null) {
+      synchronized (this) {
+        if (locks == null) {
+          int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
+          try {
+            lockSegmentSize = 
StorageEngine.getInstance().getAllDataRegionIds().size();
+          } catch (final Exception e) {
+            LOGGER.warn(
+                "Cannot get data region ids, use default lock segment size: 
{}", lockSegmentSize);
+          }
+          lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize);
+          lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize);
+
+          locks = new ReentrantLock[lockSegmentSize];
+          for (int i = 0; i < locks.length; i++) {
+            locks[i] = new ReentrantLock();
+          }
+        }
+      }
+    }
+  }
+
+  public void lock(final File file) {
+    initIfNecessary();
+    locks[Math.abs(file.hashCode()) % locks.length].lock();
+  }
+
+  public boolean tryLock(final File file, final long timeout, final TimeUnit 
timeUnit)
+      throws InterruptedException {
+    initIfNecessary();
+    return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, 
timeUnit);
+  }
+
+  public boolean tryLockAll(final long timeout, final TimeUnit timeUnit)
+      throws InterruptedException {
+    initIfNecessary();
+    int alreadyLocked = 0;
+    for (final ReentrantLock lock : locks) {
+      if (lock.tryLock(timeout, timeUnit)) {
+        alreadyLocked++;
+      } else {
+        break;
+      }
+    }
+
+    if (alreadyLocked == locks.length) {
+      return true;
+    } else {
+      unlockUntil(alreadyLocked);
+      return false;
+    }
+  }
+
+  private void unlockUntil(final int index) {
+    for (int i = 0; i < index; i++) {
+      locks[i].unlock();
+    }
+  }
+
+  public void unlock(final File file) {
+    initIfNecessary();
+    locks[Math.abs(file.hashCode()) % locks.length].unlock();
+  }
+
+  public void unlockAll() {
+    initIfNecessary();
+    unlockUntil(locks.length);
+  }
+}

Reply via email to