Author: tabish
Date: Mon Jun 4 21:26:06 2012
New Revision: 1346165
URL: http://svn.apache.org/viewvc?rev=1346165&view=rev
Log:
Initial implementation of ReentrantReadWriteLock
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp?rev=1346165&r1=1346164&r2=1346165&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp
Mon Jun 4 21:26:06 2012
@@ -18,10 +18,15 @@
#include "ReentrantReadWriteLock.h"
#include <decaf/lang/Exception.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/ThreadLocal.h>
+#include <decaf/lang/exceptions/IllegalMonitorStateException.h>
#include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
using namespace decaf;
using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::locks;
@@ -29,6 +34,27 @@ using namespace decaf::util::concurrent:
////////////////////////////////////////////////////////////////////////////////
namespace {
+ /**
+ * A counter for per-thread read hold counts. Maintained as a ThreadLocal;
+ * cached in cachedHoldCounter in class Sync
+ */
+ struct HoldCounter {
+ int count;
+ Thread* thread;
+
+ HoldCounter() : count(0), thread(Thread::currentThread()) {}
+ };
+
+ class ThreadLocalHoldCounter : public ThreadLocal<HoldCounter> {
+ public:
+
+ virtual ~ThreadLocalHoldCounter() {}
+
+ virtual HoldCounter initialValue() const {
+ return HoldCounter();
+ }
+ };
+
class Sync : public AbstractQueuedSynchronizer {
private:
@@ -44,7 +70,7 @@ namespace {
static const int MAX_COUNT;
static const int EXCLUSIVE_MASK;
- protected:
+ public:
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) {
@@ -56,8 +82,368 @@ namespace {
return c & EXCLUSIVE_MASK;
}
+ private:
+
+ /**
+ * The number of reentrant read locks held by current thread.
+ * Initialized only in constructor and readObject.
+ * Removed whenever a thread's read hold count drops to 0.
+ */
+ ThreadLocalHoldCounter readHolds;
+
+ /**
+ * The hold count of the last thread to successfully acquire
+ * readLock. This saves ThreadLocal lookup in the common case
+ * where the next thread to release is the last one to
+ * acquire.
+ */
+ HoldCounter cachedHoldCounter;
+
+ /**
+ * firstReader is the first thread to have acquired the read lock.
+ * firstReaderHoldCount is firstReader's hold count.
+ *
+ * <p>More precisely, firstReader is the unique thread that last
+ * changed the shared count from 0 to 1, and has not released the
+ * read lock since then; NULL if there is no such thread.
+ *
+ * <p>This allows tracking of read holds for uncontended read
+ * locks to be very cheap.
+ */
+ Thread* firstReader;
+ int firstReaderHoldCount;
+
public:
+ Sync() : readHolds(), cachedHoldCounter(), firstReader(NULL),
firstReaderHoldCount(0) {}
+
+ virtual bool isFair() const = 0;
+
+ /*
+ * Note that tryRelease and tryAcquire can be called by
+ * Conditions. So it is possible that their arguments contain
+ * both read and write holds that are all released during a
+ * condition wait and re-established in tryAcquire.
+ */
+
+ virtual bool tryRelease(int releases) {
+
+ if (!this->isHeldExclusively()) {
+ throw IllegalMonitorStateException(__FILE__, __LINE__, "Sync
lock not held exclusively");
+ }
+
+ int nextc = getState() - releases;
+ bool free = exclusiveCount(nextc) == 0;
+ if (free) {
+ setExclusiveOwnerThread(NULL);
+ }
+ setState(nextc);
+ return free;
+ }
+
+ bool tryAcquire(int acquires) {
+ /*
+ * Walkthrough:
+ * 1. If read count is nonzero or write count is nonzero
+ * and owner is a different thread, fail.
+ * 2. If count would saturate, fail. (This can only
+ * happen if count is already nonzero.)
+ * 3. Otherwise, this thread is eligible for lock if it is
+ * either a reentrant acquire or queue policy allows it.
+ * If so, update state and set owner.
+ */
+ Thread* current = Thread::currentThread();
+ int c = getState();
+ int w = exclusiveCount(c);
+
+ if (c != 0) {
+ // (Note: if c != 0 and w == 0 then shared count != 0)
+ if (w == 0 || current != getExclusiveOwnerThread()) {
+ return false;
+ }
+ if (w + exclusiveCount(acquires) > MAX_COUNT) {
+ throw new RuntimeException(__FILE__, __LINE__, "Maximum
lock count exceeded");
+ }
+ // Reentrant acquire
+ setState(c + acquires);
+ return true;
+ }
+
+ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
+ return false;
+ }
+
+ setExclusiveOwnerThread(current);
+ return true;
+ }
+
+ bool tryReleaseShared(int unused DECAF_UNUSED) {
+ Thread* current = Thread::currentThread();
+ if (firstReader == current) {
+ if (firstReaderHoldCount == 1) {
+ firstReader = NULL;
+ } else {
+ firstReaderHoldCount--;
+ }
+ } else {
+ HoldCounter rh = cachedHoldCounter;
+ if (rh.thread == NULL || rh.thread != current) {
+ rh = readHolds.get();
+ }
+ int count = rh.count;
+ if (count <= 1) {
+ readHolds.remove();
+ if (count <= 0) {
+ throw IllegalMonitorStateException(
+ __FILE__, __LINE__, "attempt to unlock read lock,
not locked by current thread");
+ }
+ }
+ --rh.count;
+ }
+
+ for (;;) {
+ int c = getState();
+ int nextc = c - SHARED_UNIT;
+ if (compareAndSetState(c, nextc)) {
+ // Releasing the read lock has no effect on readers, but
it may allow
+ // waiting writers to proceed if both read and write locks
are now free.
+ return nextc == 0;
+ }
+ }
+ }
+
+ virtual int tryAcquireShared(int unused DECAF_UNUSED) {
+ /*
+ * Walk through:
+ * 1. If write lock held by another thread, fail.
+ * 2. Otherwise, this thread is eligible for lock wrt state, so
+ * ask if it should block because of queue policy. If not, try
+ * to grant by CASing state and updating count. Note that step
+ * does not check for reentrant acquires, which is postponed to
+ * full version to avoid having to check hold count in the more
+ * typical non-reentrant case.
+ * 3. If step 2 fails either because thread apparently not eligible
+ * or CAS fails or count saturated, chain to version with full
+ * retry loop.
+ */
+ Thread* current = Thread::currentThread();
+ int c = getState();
+ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() !=
current) {
+ return -1;
+ }
+ int r = sharedCount(c);
+ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c,
c + SHARED_UNIT)) {
+ if (r == 0) {
+ firstReader = current;
+ firstReaderHoldCount = 1;
+ } else if (firstReader == current) {
+ firstReaderHoldCount++;
+ } else {
+ HoldCounter rh = cachedHoldCounter;
+ if (rh.thread == NULL || rh.thread != current) {
+ cachedHoldCounter = rh = readHolds.get();
+ } else if (rh.count == 0) {
+ readHolds.set(rh);
+ }
+
+ rh.count++;
+ }
+ return 1;
+ }
+
+ return fullTryAcquireShared(current);
+ }
+
+ /**
+ * Full version of acquire for reads, that handles CAS misses
+ * and reentrant reads not dealt with in tryAcquireShared.
+ */
+ int fullTryAcquireShared(Thread* current) {
+
+ HoldCounter rh;
+ for (;;) {
+ int c = getState();
+ if (exclusiveCount(c) != 0) {
+ if (getExclusiveOwnerThread() != current) {
+ return -1;
+ }
+ // else we hold the exclusive lock; blocking here
+ // would cause deadlock.
+ } else if (readerShouldBlock()) {
+ // Make sure we're not acquiring read lock reentrantly
+ if (firstReader == current) {
+ // assert firstReaderHoldCount > 0;
+ } else {
+ if (rh.thread == NULL) {
+ rh = cachedHoldCounter;
+ if (rh.thread == NULL || rh.thread != current) {
+ rh = readHolds.get();
+ if (rh.count == 0) {
+ readHolds.remove();
+ }
+ }
+ }
+
+ if (rh.count == 0) {
+ return -1;
+ }
+ }
+ }
+ if (sharedCount(c) == MAX_COUNT) {
+ throw Exception(__FILE__, __LINE__, "Maximum lock count
exceeded");
+ }
+ if (compareAndSetState(c, c + SHARED_UNIT)) {
+ if (sharedCount(c) == 0) {
+ firstReader = current;
+ firstReaderHoldCount = 1;
+ } else if (firstReader == current) {
+ firstReaderHoldCount++;
+ } else {
+ if (rh.thread == NULL) {
+ rh = cachedHoldCounter;
+ }
+ if (rh.thread == NULL || rh.thread != current) {
+ rh = readHolds.get();
+ } else if (rh.count == 0) {
+ readHolds.set(rh);
+ }
+ rh.count++;
+ cachedHoldCounter = rh; // cache for release
+ }
+ return 1;
+ }
+ }
+ }
+
+ /**
+ * Performs tryLock for write, enabling barging in both modes.
+ * This is identical in effect to tryAcquire except for lack
+ * of calls to writerShouldBlock.
+ */
+ bool tryWriteLock() {
+ Thread* current = Thread::currentThread();
+ int c = getState();
+ if (c != 0) {
+ int w = exclusiveCount(c);
+ if (w == 0 || current != getExclusiveOwnerThread()) {
+ return false;
+ }
+ if (w == MAX_COUNT) {
+ throw new Exception(__FILE__, __LINE__, "Maximum lock
count exceeded");
+ }
+ }
+ if (!compareAndSetState(c, c + 1)) {
+ return false;
+ }
+ setExclusiveOwnerThread(current);
+ return true;
+ }
+
+ /**
+ * Performs tryLock for read, enabling barging in both modes.
+ * This is identical in effect to tryAcquireShared except for
+ * lack of calls to readerShouldBlock.
+ */
+ bool tryReadLock() {
+ Thread* current = Thread::currentThread();
+ for (;;) {
+ int c = getState();
+ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() !=
current) {
+ return false;
+ }
+ int r = sharedCount(c);
+ if (r == MAX_COUNT) {
+ throw Exception(__FILE__, __LINE__, "Maximum lock count
exceeded");
+ }
+ if (compareAndSetState(c, c + SHARED_UNIT)) {
+ if (r == 0) {
+ firstReader = current;
+ firstReaderHoldCount = 1;
+ } else if (firstReader == current) {
+ firstReaderHoldCount++;
+ } else {
+ HoldCounter rh = cachedHoldCounter;
+ if (rh.thread == NULL || rh.thread != current) {
+ cachedHoldCounter = rh = readHolds.get();
+ } else if (rh.count == 0) {
+ readHolds.set(rh);
+ }
+ rh.count++;
+ }
+ return true;
+ }
+ }
+ }
+
+ virtual bool isHeldExclusively() const {
+ // While we must in general read state before owner, we don't need
to do
+ // so to check if current thread is owner
+ return getExclusiveOwnerThread() == Thread::currentThread();
+ }
+
+ ConditionObject* newCondition() {
+ return AbstractQueuedSynchronizer::createDefaultConditionObject();
+ }
+
+ Thread* getOwner() {
+ // Must read state before owner to ensure memory consistency
+ return ((exclusiveCount(getState()) == 0)? NULL :
getExclusiveOwnerThread());
+ }
+
+ int getReadLockCount() {
+ return sharedCount(getState());
+ }
+
+ bool isWriteLocked() {
+ return exclusiveCount(getState()) != 0;
+ }
+
+ int getWriteHoldCount() {
+ return isHeldExclusively() ? exclusiveCount(getState()) : 0;
+ }
+
+ int getReadHoldCount() {
+ if (getReadLockCount() == 0) {
+ return 0;
+ }
+
+ Thread* current = Thread::currentThread();
+ if (firstReader == current) {
+ return firstReaderHoldCount;
+ }
+
+ HoldCounter rh = cachedHoldCounter;
+ if (rh.thread != NULL && rh.thread == current) {
+ return rh.count;
+ }
+
+ int count = readHolds.get().count;
+ if (count == 0) {
+ readHolds.remove();
+ }
+ return count;
+ }
+
+ int getCount() {
+ return getState();
+ }
+
+ protected:
+
+ /**
+ * @returns true if the current thread, when trying to acquire the
read lock,
+ * and otherwise eligible to do so, should block because of
policy for
+ * overtaking other waiting threads.
+ */
+ virtual bool readerShouldBlock() const = 0;
+
+ /**
+ * @returns true if the current thread, when trying to acquire the
write lock,
+ * and otherwise eligible to do so, should block because of
policy for
+ * overtaking other waiting threads.
+ */
+ virtual bool writerShouldBlock() const = 0;
+
};
const int Sync::SHARED_SHIFT = 16;
@@ -68,11 +454,38 @@ namespace {
class FairSync : public Sync {
public:
+ virtual ~FairSync() {}
+
+ virtual bool readerShouldBlock() const {
+ return this->hasQueuedPredecessors();
+ }
+
+ virtual bool writerShouldBlock() const {
+ return this->hasQueuedPredecessors();
+ }
+
+ virtual bool isFair() const {
+ return true;
+ }
};
class NonFairSync : public Sync {
public:
+ virtual ~NonFairSync() {}
+
+ virtual bool readerShouldBlock() const {
+ return false;
+ }
+
+ virtual bool writerShouldBlock() const {
+ return false;
+ // TODO - add apparentlyFirstQueuedIsExclusive
+ }
+
+ virtual bool isFair() const {
+ return false;
+ }
};
class ReadLock : public Lock {
@@ -83,6 +496,168 @@ namespace {
ReadLock(Sync* sync) : Lock(), sync(sync) {
}
+ /**
+ * Acquires the read lock.
+ *
+ * <p>Acquires the read lock if the write lock is not held by
+ * another thread and returns immediately.
+ *
+ * <p>If the write lock is held by another thread then
+ * the current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until the read lock has been acquired.
+ */
+ virtual void lock() {
+ sync->acquireShared(1);
+ }
+
+ /**
+ * Acquires the read lock unless the current thread is
+ *
+ * <p>Acquires the read lock if the write lock is not held
+ * by another thread and returns immediately.
+ *
+ * <p>If the write lock is held by another thread then the
+ * current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until one of two things happens:
+ *
+ * <ul>
+ * <li>The read lock is acquired by the current thread; or
+ * <li>Some other thread interrupts the current thread.
+ * </ul>
+ *
+ * <p>If the current thread:
+ *
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is interrupted while acquiring the read lock,
+ * </ul>
+ *
+ * then nterruptedException is thrown and the current thread's
interrupted
+ * status is cleared.
+ *
+ * <p>In this implementation, as this method is an explicit
interruption
+ * point, preference is given to responding to the interrupt over
normal
+ * or reentrant acquisition of the lock.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ void lockInterruptibly() {
+ sync->acquireSharedInterruptibly(1);
+ }
+
+ /**
+ * Acquires the read lock only if the write lock is not held by
+ * another thread at the time of invocation.
+ *
+ * <p>Acquires the read lock if the write lock is not held by
+ * another thread and returns immediately with the value
+ * {@code true}. Even when this lock has been set to use a
+ * fair ordering policy, a call to {@code tryLock()}
+ * <em>will</em> immediately acquire the read lock if it is
+ * available, whether or not other threads are currently
+ * waiting for the read lock. This "barging" behavior
+ * can be useful in certain circumstances, even though it
+ * breaks fairness. If you want to honor the fairness setting
+ * for this lock, then use {@link #tryLock(long, TimeUnit)
+ * tryLock(0, TimeUnit.SECONDS) } which is almost equivalent
+ * (it also detects interruption).
+ *
+ * <p>If the write lock is held by another thread then
+ * this method will return immediately with the value
+ * {@code false}.
+ *
+ * @return {@code true} if the read lock was acquired
+ */
+ virtual bool tryLock() {
+ return sync->tryReadLock();
+ }
+
+ /**
+ * Acquires the read lock if the write lock is not held by
+ * another thread within the given waiting time and the
+ * current thread has not been interrupted.
+ *
+ * <p>Acquires the read lock if the write lock is not held by
+ * another thread and returns immediately with the value
+ * {@code true}. If this lock has been set to use a fair
+ * ordering policy then an available lock <em>will not</em> be
+ * acquired if any other threads are waiting for the
+ * lock. This is in contrast to the {@link #tryLock()}
+ * method. If you want a timed {@code tryLock} that does
+ * permit barging on a fair lock then combine the timed and
+ * un-timed forms together:
+ *
+ * <pre>if (lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }
+ * </pre>
+ *
+ * <p>If the write lock is held by another thread then the
+ * current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until one of three things happens:
+ *
+ * <ul>
+ * <li>The read lock is acquired by the current thread; or
+ * <li>Some other thread interrupts the current thread; or
+ * <li>The specified waiting time elapses.
+ * </ul>
+ *
+ * <p>If the read lock is acquired then the value {@code true} is
returned.
+ * <p>If the current thread:
+ *
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is interrupted while acquiring the read lock,
+ * </ul> then InterruptedException is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * <p>If the specified waiting time elapses then the value
+ * {@code false} is returned. If the time is less than or
+ * equal to zero, the method will not wait at all.
+ *
+ * <p>In this implementation, as this method is an explicit
+ * interruption point, preference is given to responding to
+ * the interrupt over normal or reentrant acquisition of the
+ * lock, and over reporting the elapse of the waiting time.
+ *
+ * @param timeout the time to wait for the read lock
+ * @param unit the time unit of the timeout argument
+ *
+ * @return {@code true} if the read lock was acquired
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ virtual bool tryLock(long long timeout, const TimeUnit& unit) {
+ return sync->tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ }
+
+ /**
+ * Attempts to release this lock.
+ *
+ * <p> If the number of readers is now zero then the lock
+ * is made available for write lock attempts.
+ */
+ virtual void unlock() {
+ sync->releaseShared(1);
+ }
+
+ /**
+ * Throws UnsupportedOperationException because ReadLocks do not
support conditions.
+ * @throws UnsupportedOperationException always
+ */
+ virtual Condition* newCondition() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns a string identifying this lock, as well as its lock state.
+ * The state, in brackets, includes the String {@code "Read locks ="}
+ * followed by the number of held read locks.
+ *
+ * @return a string identifying this lock, as well as its lock state
+ */
+ virtual std::string toString() const {
+ int r = sync->getReadLockCount();
+ return std::string("[Read locks = ") + Integer::toString(r) + "]";
+ }
};
class WriteLock : public Lock {
@@ -93,6 +668,249 @@ namespace {
WriteLock(Sync* sync) : Lock(), sync(sync) {
}
+ /**
+ * Acquires the write lock.
+ *
+ * <p>Acquires the write lock if neither the read nor write lock
+ * are held by another thread
+ * and returns immediately, setting the write lock hold count to
+ * one.
+ *
+ * <p>If the current thread already holds the write lock then the
+ * hold count is incremented by one and the method returns
+ * immediately.
+ *
+ * <p>If the lock is held by another thread then the current
+ * thread becomes disabled for thread scheduling purposes and
+ * lies dormant until the write lock has been acquired, at which
+ * time the write lock hold count is set to one.
+ */
+ virtual void lock() {
+ sync->acquire(1);
+ }
+
+ /**
+ * Acquires the write lock unless the current thread is interrupted
+ *
+ * <p>Acquires the write lock if neither the read nor write lock are
held
+ * by another thread and returns immediately, setting the write lock
hold
+ * count to one.
+ *
+ * <p>If the current thread already holds this lock then the hold count
+ * is incremented by one and the method returns immediately.
+ *
+ * <p>If the lock is held by another thread then the current thread
+ * becomes disabled for thread scheduling purposes and lies dormant
until
+ * one of two things happens:
+ *
+ * <ul>
+ * <li>The write lock is acquired by the current thread; or
+ * <li>Some other thread interrupts the current thread.
+ * </ul>
+ *
+ * <p>If the write lock is acquired by the current thread then the
+ * lock hold count is set to one.
+ *
+ * <p>If the current thread:
+ *
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is interrupted while acquiring the write lock,
+ * </ul>
+ *
+ * then InterruptedException is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * <p>In this implementation, as this method is an explicit
interruption
+ * point, preference is given to responding to the interrupt over
normal or
+ * reentrant acquisition of the lock.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ virtual void lockInterruptibly() {
+ sync->acquireInterruptibly(1);
+ }
+
+ /**
+ * Acquires the write lock only if it is not held by another thread
+ * at the time of invocation.
+ *
+ * <p>Acquires the write lock if neither the read nor write lock are
held
+ * by another thread and returns immediately with the value {@code
true},
+ * setting the write lock hold count to one. Even when this lock has
+ * been set to use a fair ordering policy, a call to tryLock()
immediately
+ * acquire the lock if it is available, whether or not other threads
are
+ * currently waiting for the write lock. This "barging"
behavior
+ * can be useful in certain circumstances, even though it breaks
fairness.
+ * If you want to honor the fairness setting for this lock, then use
+ * tryLock(0, TimeUnit.SECONDS) which is almost equivalent (it also
+ * detects interruption).
+ *
+ * <p> If the current thread already holds this lock then the hold
count is
+ * incremented by one and the method returns true.
+ *
+ * <p>If the lock is held by another thread then this method will
return
+ * immediately with the value {@code false}.
+ *
+ * @return if the lock was free and was acquired by the current
thread, or
+ * the write lock was already held by the current thread; and false
otherwise.
+ */
+ virtual bool tryLock() {
+ return sync->tryWriteLock();
+ }
+
+ /**
+ * Acquires the write lock if it is not held by another thread
+ * within the given waiting time and the current thread has
+ * not been interrupted.
+ *
+ * <p>Acquires the write lock if neither the read nor write lock
+ * are held by another thread
+ * and returns immediately with the value true,
+ * setting the write lock hold count to one. If this lock has been
+ * set to use a fair ordering policy then an available lock
+ * <em>will not</em> be acquired if any other threads are
+ * waiting for the write lock. This is in contrast to the
+ * tryLock() method. If you want a timed tryLock
+ * that does permit barging on a fair lock then combine the
+ * timed and un-timed forms together:
+ *
+ * <pre>if (lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }
+ * </pre>
+ *
+ * <p>If the current thread already holds this lock then the
+ * hold count is incremented by one and the method returns
+ * true.
+ *
+ * <p>If the lock is held by another thread then the current
+ * thread becomes disabled for thread scheduling purposes and
+ * lies dormant until one of three things happens:
+ *
+ * <ul>
+ * <li>The write lock is acquired by the current thread; or
+ * <li>Some other thread interrupts the current thread; or
+ * <li>The specified waiting time elapses
+ * </ul>
+ *
+ * <p>If the write lock is acquired then the value {@code true} is
+ * returned and the write lock hold count is set to one.
+ *
+ * <p>If the current thread:
+ *
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is interrupted while acquiring the write lock,
+ * </ul>
+ *
+ * then InterruptedException is thrown and the current
+ * thread's interrupted status is cleared.
+ *
+ * <p>If the specified waiting time elapses then the value
+ * false is returned. If the time is less than or
+ * equal to zero, the method will not wait at all.
+ *
+ * <p>In this implementation, as this method is an explicit
+ * interruption point, preference is given to responding to
+ * the interrupt over normal or reentrant acquisition of the
+ * lock, and over reporting the elapse of the waiting time.
+ *
+ * @param timeout the time to wait for the write lock
+ * @param unit the time unit of the timeout argument
+ *
+ * @return true if the lock was free and was acquired
+ * by the current thread, or the write lock was already held by the
+ * current thread; and false if the waiting time
+ * elapsed before the lock could be acquired.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ virtual bool tryLock(long long timeout, const TimeUnit& unit) {
+ return sync->tryAcquireNanos(1, unit.toNanos(timeout));
+ }
+
+ /**
+ * Attempts to release this lock.
+ *
+ * <p>If the current thread is the holder of this lock then the hold
+ * count is decremented. If the hold count is now zero then the lock
+ * is released. If the current thread is not the holder of this lock
+ * then IllegalMonitorStateException is thrown.
+ *
+ * @throws IllegalMonitorStateException if the current thread does not
+ * hold this lock.
+ */
+ virtual void unlock() {
+ sync->release(1);
+ }
+
+ /**
+ * Returns a Condition instance for use with this Lock instance.
+ *
+ * <p>The returned Condition instance supports the same usages as do
the
+ * Mutex methods wait, notify, and notifyAll when used with the
built-in
+ * mutex lock.
+ *
+ * <ul>
+ * <li>If this write lock is not held when any Condition method is
called
+ * then an IllegalMonitorStateException is thrown. (Read locks are
+ * held independently of write locks, so are not checked or affected.
+ * However it is essentially always an error to invoke a condition
waiting
+ * method when the current thread has also acquired read locks,
since other
+ * threads that could unblock it will not be able to acquire the
write
+ * lock.)
+ * <li>When the condition waiting methods are called the write lock
is
+ * released and, before they return, the write lock is reacquired
and the
+ * lock hold count restored to what it was when the method was
called.
+ * <li>If a thread is interrupted while waiting then the wait will
terminate,
+ * an InterruptedException will be thrown, and the thread's
interrupted
+ * status will be cleared.
+ * <li> Waiting threads are signalled in FIFO order.
+ * <li>The ordering of lock reacquisition for threads returning from
+ * waiting methods is the same as for threads initially acquiring
the lock,
+ * which is in the default case not specified, but for <em>fair</em>
locks
+ * favors those threads that have been waiting the longest.
+ * </ul>
+ *
+ * @return the Condition object
+ */
+ virtual Condition* newCondition() {
+ return sync->newCondition();
+ }
+
+ /**
+ * Returns a string identifying this lock, as well as its lock state.
The
+ * state, in brackets includes either the String "Unlocked" or the
String
+ * "Locked by" followed by the name of the owning thread.
+ *
+ * @return a string identifying this lock, as well as its lock state
+ */
+ virtual std::string toString() const {
+ Thread* o = sync->getOwner();
+ return std::string("Lock") + ((o == NULL) ? "[Unlocked]" :
+ "[Locked by thread " +
o->getName() + "]");
+ }
+
+ /**
+ * Queries if this write lock is held by the current thread.
Identical in
+ * effect to isWriteLockedByCurrentThread.
+ *
+ * @return true if the current thread holds this lock and false
otherwise
+ */
+ virtual bool isHeldByCurrentThread() const {
+ return sync->isHeldExclusively();
+ }
+
+ /**
+ * Queries the number of holds on this write lock by the current
thread.
+ * A thread has a hold on a lock for each lock action that is not
matched
+ * by an unlock action. Identical in effect to getWriteHoldCount.
+ *
+ * @return the number of holds on this lock by the current thread,
+ * or zero if this lock is not held by the current thread
+ */
+ virtual int getHoldCount() const {
+ return sync->getWriteHoldCount();
+ }
};
}
@@ -116,8 +934,8 @@ namespace locks {
sync = new NonFairSync();
}
-// readLock = new ReadLock(sync);
-// writeLock = new WriteLock(sync);
+ readLock = new ReadLock(sync);
+ writeLock = new WriteLock(sync);
}
~ReentrantReadWriteLockImpl() {
@@ -130,7 +948,7 @@ namespace locks {
}}}}
////////////////////////////////////////////////////////////////////////////////
-ReentrantReadWriteLock::ReentrantReadWriteLock() : ReadWriteLock(), impl(new
ReentrantReadWriteLockImpl(true)) {
+ReentrantReadWriteLock::ReentrantReadWriteLock() : ReadWriteLock(), impl(new
ReentrantReadWriteLockImpl(false)) {
}
////////////////////////////////////////////////////////////////////////////////
@@ -155,3 +973,130 @@ Lock& ReentrantReadWriteLock::readLock()
Lock& ReentrantReadWriteLock::writeLock() {
throw "";
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::isFair() const {
+ return this->impl->sync->isFair();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread* ReentrantReadWriteLock::getOwner() const {
+ return this->impl->sync->getOwner();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getReadLockCount() const {
+ return this->impl->sync->getReadLockCount();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::isWriteLocked() const {
+ return this->impl->sync->isWriteLocked();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::isWriteLockedByCurrentThread() const {
+ return this->impl->sync->isHeldExclusively();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getWriteHoldCount() const {
+ return this->impl->sync->getWriteHoldCount();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getReadHoldCount() const {
+ return this->impl->sync->getReadHoldCount();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<Thread*>* ReentrantReadWriteLock::getQueuedWriterThreads() const {
+ return this->impl->sync->getExclusiveQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<Thread*>* ReentrantReadWriteLock::getQueuedReaderThreads() const {
+ return this->impl->sync->getSharedQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::hasQueuedThreads() const {
+ return this->impl->sync->hasQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::hasQueuedThread(Thread* thread) const {
+ return this->impl->sync->isQueued(thread);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getQueueLength() const {
+ return this->impl->sync->getQueueLength();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<Thread*>* ReentrantReadWriteLock::getQueuedThreads() const {
+ return this->impl->sync->getQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::hasWaiters(Condition* condition) const {
+
+ if (condition == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The Condition to check
was NULL");
+ }
+
+ const AbstractQueuedSynchronizer::ConditionObject* cond =
+ dynamic_cast<const
AbstractQueuedSynchronizer::ConditionObject*>(condition);
+
+ if (cond == NULL) {
+ throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not
associated with this Lock");
+ }
+
+ return this->impl->sync->hasWaiters(cond);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getWaitQueueLength(Condition* condition) const {
+
+ if (condition == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The Condition to check
was NULL");
+ }
+
+ const AbstractQueuedSynchronizer::ConditionObject* cond =
+ dynamic_cast<const
AbstractQueuedSynchronizer::ConditionObject*>(condition);
+
+ if (cond == NULL) {
+ throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not
associated with this Lock");
+ }
+
+ return this->impl->sync->getWaitQueueLength(cond);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<decaf::lang::Thread*>*
ReentrantReadWriteLock::getWaitingThreads(Condition* condition) const {
+
+ if (condition == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The Condition to check
was NULL");
+ }
+
+ const AbstractQueuedSynchronizer::ConditionObject* cond =
+ dynamic_cast<const
AbstractQueuedSynchronizer::ConditionObject*>(condition);
+
+ if (cond == NULL) {
+ throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not
associated with this Lock");
+ }
+
+ return this->impl->sync->getWaitingThreads(cond);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ReentrantReadWriteLock::toString() const {
+ int c = this->impl->sync->getCount();
+ int w = this->impl->sync->exclusiveCount(c);
+ int r = this->impl->sync->sharedCount(c);
+
+ return std::string("ReentrantReadWriteLock: ") +
+ "[Write locks = " + Integer::toString(w) +
+ ", Read locks = " + Integer::toString(r) + "]";
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h?rev=1346165&r1=1346164&r2=1346165&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h
Mon Jun 4 21:26:06 2012
@@ -20,6 +20,8 @@
#include <decaf/util/Config.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/util/Collection.h>
#include <decaf/util/concurrent/locks/ReadWriteLock.h>
namespace decaf {
@@ -67,6 +69,199 @@ namespace locks {
*/
virtual Lock& writeLock();
+ /**
+ * Returns true if this lock has fairness set true.
+ *
+ * @returns true if the Lock uses a fair policy otherwise false.
+ */
+ bool isFair() const;
+
+ /**
+ * Queries the number of read locks held for this lock. This method is
designed
+ * for use in monitoring system state, not for synchronization control.
+ *
+ * @return the number of read locks held.
+ */
+ int getReadLockCount() const;
+
+ /**
+ * Queries if the write lock is held by any thread. This method is
designed for
+ * use in monitoring system state, not for synchronization control.
+ *
+ * @return true if any thread holds the write lock and false otherwise
+ */
+ bool isWriteLocked() const;
+
+ /**
+ * Queries if the write lock is held by the current thread.
+ *
+ * @return true if the current thread holds the write lock and false
otherwise
+ */
+ bool isWriteLockedByCurrentThread() const;
+
+ /**
+ * Queries the number of reentrant write holds on this lock by the
current thread.
+ * A writer thread has a hold on a lock for each lock action that is
not matched
+ * by an unlock action.
+ *
+ * @return the number of holds on the write lock by the current thread,
+ * or zero if the write lock is not held by the current thread
+ */
+ int getWriteHoldCount() const;
+
+ /**
+ * Queries the number of reentrant read holds on this lock by the
current thread.
+ * A reader thread has a hold on a lock for each lock action that is
not matched
+ * by an unlock action.
+ *
+ * @return the number of holds on the read lock by the current thread,
+ * or zero if the read lock is not held by the current thread
+ */
+ int getReadHoldCount() const;
+
+ /**
+ * Queries whether any threads are waiting on the given condition
associated with
+ * the write lock. Note that because timeouts and interrupts may occur
at any time,
+ * a true return does not guarantee that a future signal will awaken
any threads.
+ * This method is designed primarily for use in monitoring of the
system state.
+ *
+ * @param condition
+ * The condition to be queried for waiters.
+ *
+ * @return true if there are any waiting threads
+ *
+ * @throws NullPointerException if the ConditionObject pointer is NULL.
+ * @throws IllegalArgumentException if the ConditionObject is not
associated with this Lock.
+ * @throws IllegalMonitorStateException if the caller does not hold
exclusive synchronization.
+ */
+ bool hasWaiters(Condition* condition) const;
+
+ /**
+ * Gets an estimated count of the number of threads that are currently
waiting on the given
+ * Condition object, this value changes dynamically so the result of
this method can be invalid
+ * immediately after it is called. The Condition object must be
associated with this Lock
+ * or an exception will be thrown.
+ *
+ * @returns an estimate of the number of waiting threads.
+ *
+ * @throws NullPointerException if the ConditionObject pointer is NULL.
+ * @throws IllegalArgumentException if the ConditionObject is not
associated with this Synchronizer.
+ * @throws IllegalMonitorStateException if the caller does not hold
exclusive synchronization.
+ */
+ int getWaitQueueLength(Condition* condition) const;
+
+ /**
+ * Returns a string identifying this lock, as well as its lock state.
The state,
+ * in brackets, includes the String "Write locks =" followed by the
number of
+ * reentrantly held write locks, and the String "Read locks ="
followed by the
+ * number of held read locks.
+ *
+ * @return a string identifying this lock, as well as its lock state
+ */
+ std::string toString() const;
+
+ protected:
+
+ /**
+ * Creates and returns a new Collection object that contains all the
threads that may be waiting
+ * on the given Condition object instance at the time this method is
called.
+ *
+ * @returns a Collection pointer that contains waiting threads on
given Condition object.
+ * The caller owns the returned pointer.
+ *
+ * @throws NullPointerException if the ConditionObject pointer is NULL.
+ * @throws IllegalArgumentException if the ConditionObject is not
associated with this Synchronizer.
+ * @throws IllegalMonitorStateException if the caller does not hold
exclusive synchronization.
+ */
+ decaf::util::Collection<decaf::lang::Thread*>*
getWaitingThreads(Condition* condition) const;
+
+ /**
+ * Queries whether any threads are waiting to acquire the read or
write lock.
+ * Note that because cancellations may occur at any time, a true
return does
+ * not guarantee that any other thread will ever acquire a lock. This
method
+ * is designed primarily for use in monitoring of the system state.
+ *
+ * @return if there may be other threads waiting to acquire the lock
+ */
+ bool hasQueuedThreads() const;
+
+ /**
+ * Queries whether the given thread is waiting to acquire either the
read or
+ * write lock. Note that because cancellations may occur at any time,
a true
+ * return does not guarantee that this thread will ever acquire a
lock. This
+ * method is designed primarily for use in monitoring of the system
state.
+ *
+ * @param thread
+ * The thread that will be queried for.
+ *
+ * @return true if the given thread is queued waiting for this lock
+ *
+ * @throws NullPointerException if the thread is NULL.
+ */
+ bool hasQueuedThread(decaf::lang::Thread* thread) const;
+
+ /**
+ * Returns an estimate of the number of threads waiting to acquire
either the
+ * read or write lock. The value is only an estimate because the
number of
+ * threads may change dynamically while this method traverses internal
data
+ * structures. This method is designed for use in monitoring of the
system
+ * state, not for synchronization control.
+ *
+ * @return the estimated number of threads waiting for this lock
+ */
+ int getQueueLength() const;
+
+ protected:
+
+ /**
+ * Returns a collection containing threads that may be waiting to
acquire either
+ * the read or write lock. Because the actual set of threads may
change dynamically
+ * while constructing this result, the returned collection is only a
best-effort
+ * estimate. The elements of the returned collection are in no
particular order.
+ * This method is designed to facilitate construction of subclasses
that provide
+ * more extensive monitoring facilities.
+ *
+ * @return the collection of threads
+ */
+ decaf::util::Collection<decaf::lang::Thread*>* getQueuedThreads()
const;
+
+ /**
+ * Returns a collection containing threads that may be waiting to
acquire the
+ * write lock. Because the actual set of threads may change
dynamically
+ * while constructing this result, the returned collection is only a
best-effort
+ * estimate. The elements of the returned collection are in no
particular order.
+ * This method is designed to facilitate construction of subclasses
that provide
+ * more extensive lock monitoring facilities.
+ *
+ * @return the collection of threads
+ */
+ decaf::util::Collection<decaf::lang::Thread*>*
getQueuedWriterThreads() const;
+
+ /**
+ * Returns a collection containing threads that may be waiting to
acquire the
+ * read lock. Because the actual set of threads may change
dynamically while
+ * constructing this result, the returned collection is only a
best-effort estimate.
+ * The elements of the returned collection are in no particular order.
This method
+ * is designed to facilitate construction of subclasses that provide
more extensive
+ * lock monitoring facilities.
+ *
+ * @return the collection of threads
+ */
+ decaf::util::Collection<decaf::lang::Thread*>*
getQueuedReaderThreads() const;
+
+ /**
+ * Returns the thread that currently owns the write lock, or NULL if
not
+ * owned. When this method is called by a thread that is not the
owner, the
+ * return value reflects a best-effort approximation of current lock
status.
+ * For example, the owner may be momentarily NULL even if there are
threads
+ * trying to acquire the lock but have not yet done so. This method is
+ * designed to facilitate construction of subclasses that provide more
+ * extensive lock monitoring facilities.
+ *
+ * @return the owner thread pointer, or NULL if not currently owned.
+ */
+ decaf::lang::Thread* getOwner() const;
+
};
}}}}