This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new 1b1ef58b0 [#4129] improvement(core): Support hold multiple tree lock within a thread at the same time (#4130) 1b1ef58b0 is described below commit 1b1ef58b0fe9f9f84575319fbd953d8ea3351f61 Author: Qi Yu <y...@datastrato.com> AuthorDate: Fri Jul 12 10:09:06 2024 +0800 [#4129] improvement(core): Support hold multiple tree lock within a thread at the same time (#4130) ### What changes were proposed in this pull request? Add the value of the name identifier in the holdingThreadTimestamp to support holding multiple tree lock at the same time. ### Why are the changes needed? To support more user sceanrio Fix: #4129 ### Does this PR introduce _any_ user-facing change? N/A. ### How was this patch tested? Add new test class `TestTreeLockUtils` --- .../com/datastrato/gravitino/lock/LockManager.java | 6 +- .../com/datastrato/gravitino/lock/TreeLock.java | 21 +++++- .../datastrato/gravitino/lock/TreeLockNode.java | 82 +++++++++++++++------- .../gravitino/lock/TestTreeLockUtils.java | 51 ++++++++++++++ 4 files changed, 131 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java b/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java index b1dbb27fe..9fb0ef6e1 100644 --- a/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java +++ b/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java @@ -132,12 +132,12 @@ public class LockManager { // Check self node.getHoldingThreadTimestamp() .forEach( - (thread, ts) -> { + (threadIdentifier, ts) -> { // If the thread is holding the lock for more than 30 seconds, we will log it. if (System.currentTimeMillis() - ts > 30000) { LOG.warn( - "Dead lock detected for thread {} on node {}, threads that holding the node: {} ", - thread, + "Dead lock detected for thread with identifier {} on node {}, threads that holding the node: {} ", + threadIdentifier, node, node.getHoldingThreadTimestamp()); } diff --git a/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java b/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java index 76d9ab028..02cb0c757 100644 --- a/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java +++ b/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java @@ -104,8 +104,17 @@ public class TreeLock { try { treeLockNode.lock(type); heldLocks.push(Pair.of(treeLockNode, type)); + + treeLockNode.addHoldingThreadTimestamp( + Thread.currentThread(), identifier, System.currentTimeMillis()); if (LOG.isTraceEnabled()) { - LOG.trace("Locked node: {}, lock type: {}", treeLockNode, type); + LOG.trace( + "Node {} has been lock with '{}' lock, hold by {} with ident '{}' at {}", + this, + lockType, + Thread.currentThread(), + identifier, + System.currentTimeMillis()); } } catch (Exception e) { LOG.error( @@ -140,8 +149,16 @@ public class TreeLock { TreeLockNode current = pair.getLeft(); LockType type = pair.getRight(); current.unlock(type); + + long holdStartTime = current.removeHoldingThreadTimestamp(Thread.currentThread(), identifier); if (LOG.isTraceEnabled()) { - LOG.trace("Unlocked node: {}, lock type: {}", current, type); + LOG.trace( + "Node {} has been unlock with '{}' lock, hold by {} with ident '{}' for {} ms", + this, + lockType, + Thread.currentThread(), + identifier, + System.currentTimeMillis() - holdStartTime); } } diff --git a/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java b/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java index a4953c541..92db979aa 100644 --- a/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java +++ b/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java @@ -19,6 +19,7 @@ package com.datastrato.gravitino.lock; +import com.datastrato.gravitino.NameIdentifier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.collect.Lists; @@ -44,13 +45,60 @@ public class TreeLockNode { private final String name; private final ReentrantReadWriteLock readWriteLock; @VisibleForTesting final Map<String, TreeLockNode> childMap; - private final Map<Thread, Long> holdingThreadTimestamp = new ConcurrentHashMap<>(); + + private final Map<ThreadIdentifier, Long> holdingThreadTimestamp = new ConcurrentHashMap<>(); // The reference count of this node. The reference count is used to track the number of the // TreeLocks that are using this node. If the reference count is 0, it means that no TreeLock is // using this node, and this node can be removed from the tree. private final AtomicLong referenceCount = new AtomicLong(); + /** + * The identifier of a thread. This class is used to identify this tree lock node is held by which + * thread and identifier because one thread can hold multiple tree lock nodes at the same time. + * For example, a thread can hold the lock of the root node and the lock of the child node at the + * same time. + */ + static class ThreadIdentifier { + private final Thread thread; + private final NameIdentifier ident; + + public ThreadIdentifier(Thread thread, NameIdentifier ident) { + this.thread = thread; + this.ident = ident; + } + + static ThreadIdentifier of(Thread thread, NameIdentifier identifier) { + return new ThreadIdentifier(thread, identifier); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof ThreadIdentifier)) { + return false; + } + ThreadIdentifier that = (ThreadIdentifier) o; + return Objects.equal(thread, that.thread) && Objects.equal(ident, that.ident); + } + + @Override + public int hashCode() { + return Objects.hashCode(thread, ident); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ThreadIdentifier{"); + sb.append("thread=").append(thread); + sb.append(", ident=").append(ident); + sb.append('}'); + return sb.toString(); + } + } + protected TreeLockNode(String name) { this.name = name; this.readWriteLock = new ReentrantReadWriteLock(); @@ -61,10 +109,18 @@ public class TreeLockNode { return name; } - Map<Thread, Long> getHoldingThreadTimestamp() { + Map<ThreadIdentifier, Long> getHoldingThreadTimestamp() { return holdingThreadTimestamp; } + void addHoldingThreadTimestamp(Thread currentThread, NameIdentifier identifier, long timestamp) { + holdingThreadTimestamp.put(ThreadIdentifier.of(currentThread, identifier), timestamp); + } + + long removeHoldingThreadTimestamp(Thread currentThread, NameIdentifier identifier) { + return holdingThreadTimestamp.remove(ThreadIdentifier.of(currentThread, identifier)); + } + /** * Increase the reference count of this node. The reference count should always be greater than or * equal to 0. @@ -97,17 +153,6 @@ public class TreeLockNode { } else { readWriteLock.writeLock().lock(); } - - holdingThreadTimestamp.put(Thread.currentThread(), System.currentTimeMillis()); - if (LOG.isTraceEnabled()) { - LOG.trace( - "Node {} has been lock with '{}' lock, hold by {} at {}, current holding threads: {}", - this, - lockType, - Thread.currentThread(), - System.currentTimeMillis(), - holdingThreadTimestamp); - } } /** @@ -125,17 +170,6 @@ public class TreeLockNode { } this.referenceCount.decrementAndGet(); - - long holdStartTime = holdingThreadTimestamp.remove(Thread.currentThread()); - if (LOG.isTraceEnabled()) { - LOG.trace( - "Node {} has been unlock with '{}' lock, hold by {} for {} ms, current holding threads: {}", - this, - lockType, - Thread.currentThread(), - System.currentTimeMillis() - holdStartTime, - holdingThreadTimestamp); - } } /** diff --git a/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java b/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java new file mode 100644 index 000000000..43fb22443 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datastrato.gravitino.lock; + +import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.GravitinoEnv; +import com.datastrato.gravitino.NameIdentifier; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.junit.jupiter.api.Test; + +public class TestTreeLockUtils { + + @Test + void testHolderMultipleLock() throws Exception { + Config config = mock(Config.class); + doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY); + doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY); + doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL); + FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true); + + TreeLockUtils.doWithTreeLock( + NameIdentifier.of("test"), + LockType.READ, + () -> + TreeLockUtils.doWithTreeLock( + NameIdentifier.of("test", "test1"), LockType.WRITE, () -> null)); + } +}