Author: ozeigermann
Date: Mon Jul 23 10:40:10 2007
New Revision: 558810
URL: http://svn.apache.org/viewvc?view=rev&rev=558810
Log:
Added first version of lock manager that supports deadlock detection.
This has not been properly tested and still misses reasonable thread-safety
protection.
Modified:
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java
Modified:
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java?view=diff&rev=558810&r1=558809&r2=558810
==============================================================================
---
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java
(original)
+++
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/LockException.java
Mon Jul 23 10:40:10 2007
@@ -43,7 +43,7 @@
/**
* Locking request canceled because of deadlock.
*/
- DEADLOCK_VICTIM,
+ WOULD_DEADLOCK,
/**
* A conflict between two optimistic transactions occured.
Modified:
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java
URL:
http://svn.apache.org/viewvc/jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java?view=diff&rev=558810&r1=558809&r2=558810
==============================================================================
---
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java
(original)
+++
jakarta/commons/proper/transaction/branches/TRANSACTION_2/src/java/org/apache/commons/transaction/locking/RWLockManager.java
Mon Jul 23 10:40:10 2007
@@ -16,10 +16,13 @@
*/
package org.apache.commons.transaction.locking;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -31,45 +34,15 @@
protected ConcurrentHashMap<KeyEntry<K, M>, ReadWriteLock> locks = new
ConcurrentHashMap<KeyEntry<K, M>, ReadWriteLock>();
- protected Map<Thread, Set<Lock>> locksForThreads = new
ConcurrentHashMap<Thread, Set<Lock>>();
+ protected Map<Thread, CopyOnWriteArraySet<Lock>> locksForThreads = new
ConcurrentHashMap<Thread, CopyOnWriteArraySet<Lock>>();
- protected Map<ReadWriteLock, Set<Thread>> threadsForLocks = new
ConcurrentHashMap<ReadWriteLock, Set<Thread>>();
+ protected ConcurrentHashMap<Lock, Set<Thread>> threadsForLocks = new
ConcurrentHashMap<Lock, Set<Thread>>();
protected Map<Thread, Long> effectiveGlobalTimeouts = new
ConcurrentHashMap<Thread, Long>();
- // TODO
- public Iterable<ReadWriteLock> orderLocks() {
- Set<Lock> locks = locksForThreads.get(Thread.currentThread());
- if (locks == null) {
- throw new IllegalStateException("lock() can only be called after
startWork()");
- }
-
- return null;
-
- }
-
@Override
public void endWork() {
- Set<Lock> locks = locksForThreads.get(Thread.currentThread());
- // graceful reaction...
- if (locks == null) {
- return;
- }
- for (Lock lock : locks) {
- lock.unlock();
-
- // FIXME: We need to do this atomically
- Set<Thread> threadsForThisLock = threadsForLocks.get(lock);
- if (threadsForThisLock != null) {
- threadsForThisLock.remove(Thread.currentThread());
- if (threadsForThisLock.isEmpty()) {
- threadsForLocks.remove(lock);
- locks.remove(lock);
- }
- }
- }
-
- locksForThreads.remove(Thread.currentThread());
+ release(Thread.currentThread());
}
@Override
@@ -77,7 +50,7 @@
if (isWorking()) {
throw new IllegalStateException("work has already been started");
}
- locksForThreads.put(Thread.currentThread(), new HashSet<Lock>());
+ locksForThreads.put(Thread.currentThread(), new
CopyOnWriteArraySet<Lock>());
long timeoutMSecs = unit.toMillis(timeout);
long now = System.currentTimeMillis();
@@ -91,8 +64,8 @@
}
- protected long computeRemainingTime() {
- long timeout = effectiveGlobalTimeouts.get(Thread.currentThread());
+ protected long computeRemainingTime(Thread thread) {
+ long timeout = effectiveGlobalTimeouts.get(thread);
long now = System.currentTimeMillis();
long remaining = timeout - now;
return remaining;
@@ -147,10 +120,8 @@
@Override
public void lock(M managedResource, K key, boolean exclusive) throws
LockException {
- long remainingTime = computeRemainingTime();
- if (remainingTime < 0) {
- throw new LockException(LockException.Code.TIMED_OUT);
- }
+ long remainingTime = computeRemainingTime(Thread.currentThread());
+
boolean locked = tryLockInternal(managedResource, key, exclusive,
remainingTime,
TimeUnit.MILLISECONDS);
if (!locked) {
@@ -165,6 +136,8 @@
protected boolean tryLockInternal(M managedResource, K key, boolean
exclusive, long time,
TimeUnit unit) throws LockException {
+ reportTimeout(Thread.currentThread());
+
KeyEntry<K, M> entry = new KeyEntry<K, M>(key, managedResource);
ReadWriteLock rwlock = putIfAbsent(entry, create());
Set<Lock> locks = locksForThreads.get(Thread.currentThread());
@@ -174,20 +147,171 @@
Lock lock = exclusive ? rwlock.writeLock() : rwlock.readLock();
+ boolean locked;
+ if (time == 0) {
+ locked = lock.tryLock();
+ } else {
+ locked =
doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(lock,
unit.toMillis(time));
+ }
+ if (locked) {
+ locks.add(lock);
+ Set<Thread> threads = threadsForLocks.get(lock);
+ if (threads == null) {
+ threads = new HashSet<Thread>();
+ Set<Thread> concurrentlyInsertedThreads = threadsForLocks
+ .putIfAbsent(lock, threads);
+ if (concurrentlyInsertedThreads != null)
+ threads = concurrentlyInsertedThreads;
+ }
+ threads.add(Thread.currentThread());
+ }
+ return locked;
+ }
+
+ protected boolean
doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(Lock lock,
+ long timeMsecs) throws LockException {
+
+ // This algorithm is devided into three parts:
+ // Note: We can be interrupted most of the time
+ //
+ // I prewait:
+ // Wait a fraktion of the time to see if we can acquire
+ // the lock in short time. If we can all is good and we exit
+ // signalling success. If not we need to get into a more resource
+ // consuming phase.
+ //
+ // II clearing of timed out thtreads / deadlock detection:
+ // As we have not been able to acquire the lock, yet, maybe there is
+ // deadlock. Clear all threads already timed out and afterwards
+ // check for a deadlock state. If there is one report it with an
+ // exception. If not we enter the final phase.
+ //
+ // III real wait:
+ // Everything is under control, we were just a little bit too
+ // impatient. So wait for the remaining time and see if the can get
+ // the lock
+ //
+
try {
boolean locked;
- if (time == 0) {
- locked = lock.tryLock();
- } else {
- locked = lock.tryLock(time, unit);
- }
- if (locked) {
- locks.add(lock);
+
+ // I prewait
+
+ long startTime = System.currentTimeMillis();
+
+ // TODO this heuristic devisor really should be configurable
+ long preWaitTime = timeMsecs / 5;
+ locked = lock.tryLock(preWaitTime, TimeUnit.MILLISECONDS);
+ if (locked)
+ return true;
+
+ // II deadlock detect
+ cancelAllTimedOut();
+ if (wouldDeadlock(Thread.currentThread(), new HashSet<Thread>())) {
+ throw new LockException(LockException.Code.WOULD_DEADLOCK);
}
+
+ // III real wait
+ long now = System.currentTimeMillis();
+ long remainingWaitTime = timeMsecs - (now - startTime);
+ if (remainingWaitTime < 0)
+ return false;
+
+ locked = lock.tryLock(remainingWaitTime, TimeUnit.MILLISECONDS);
return locked;
} catch (InterruptedException e) {
- throw new LockException(Code.INTERRUPTED, key);
+ throw new LockException(Code.INTERRUPTED);
}
+
+ }
+
+ protected boolean wouldDeadlock(Thread thread, Set<Thread> path) {
+ path.add(thread);
+ // these are our locks
+ // Note: No need to make a copy as we can be sure to iterate on our
+ // private
+ // version, as this is a CopyOnWriteArraySet!
+ CopyOnWriteArraySet<Lock> locks = locksForThreads.get(thread);
+ for (Lock lock : locks) {
+ // these are the ones waiting for one of our locks
+ // and if they wait, they wait because of me!
+ Collection<Thread> conflicts =
getConflictingWaiters((ReentrantReadWriteLock) lock);
+ for (Thread conflictThread : conflicts) {
+ // this means, we have found a cycle in the wait graph
+ if (path.contains(conflictThread)) {
+ return true;
+ } else if (wouldDeadlock(conflictThread, path)) {
+ return true;
+ }
+ }
+ }
+
+ path.remove(thread);
+ return false;
+ }
+
+ protected Collection<Thread> getConflictingWaiters(ReentrantReadWriteLock
lock) {
+ Collection<Thread> result = new ArrayList<Thread>();
+ // Consider every thread that holds at least one lock!
+ // Caution: We can not use "threadsForLocks" as the waiting threads
+ // have not yet acquired the lock and thus are not part of the map.
+ // An alternative algorithm could also remember the threads waiting for
+ // a lock
+ Collection<Thread> threadsWithLocks = locksForThreads.keySet();
+ for (Thread thread : threadsWithLocks) {
+ if (lock.hasQueuedThread(thread)) {
+ result.add(thread);
+ }
+ }
+ return result;
+ }
+
+ protected void reportTimeout(Thread thread) throws LockException {
+ if (hasTimedOut(thread)) {
+ throw new LockException(LockException.Code.TIMED_OUT);
+ }
+ }
+
+ protected void cancelAllTimedOut() {
+ Set<Thread> threads = effectiveGlobalTimeouts.keySet();
+ for (Thread thread : threads) {
+ if (hasTimedOut(thread)) {
+ // TODO: We need to record this thread has timed out to produce
+ // a meaningful exception when it tries to continue its work
+ release(thread);
+ thread.interrupt();
+ }
+
+ }
+ }
+
+ protected boolean hasTimedOut(Thread thread) {
+ long remainingTime = computeRemainingTime(thread);
+ return (remainingTime < 0);
+
+ }
+
+ protected void release(Thread thread) {
+ Set<Lock> locks = locksForThreads.get(thread);
+ // graceful reaction...
+ if (locks == null) {
+ return;
+ }
+ for (Lock lock : locks) {
+ lock.unlock();
+
+ // FIXME: We need to do this atomically
+ Set<Thread> threadsForThisLock = threadsForLocks.get(lock);
+ if (threadsForThisLock != null) {
+ threadsForThisLock.remove(Thread.currentThread());
+ if (threadsForThisLock.isEmpty()) {
+ threadsForLocks.remove(lock);
+ locks.remove(lock);
+ }
+ }
+ }
+
+ locksForThreads.remove(thread);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]