Author: rajdavies
Date: Thu May 29 04:27:33 2008
New Revision: 661295
URL: http://svn.apache.org/viewvc?rev=661295&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1748
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=661295&r1=661294&r2=661295&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu May 29 04:27:33 2008
@@ -1062,7 +1062,12 @@
}
final void sendMessage(final ConnectionContext context, Message msg)
throws Exception {
- messages.addMessageLast(msg);
+ if (!msg.isPersistent() && messages.getSystemUsage() != null) {
+ messages.getSystemUsage().getTempUsage().waitForSpace();
+ }
+ synchronized(messages) {
+ messages.addMessageLast(msg);
+ }
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=661295&r1=661294&r2=661295&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Thu May 29 04:27:33 2008
@@ -21,7 +21,6 @@
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -60,7 +59,6 @@
private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean();
private MessageReference last = null;
- private ReentrantLock lock = new ReentrantLock(true);
/**
* @param name
@@ -95,25 +93,20 @@
/**
* @return true if there are no pending messages
*/
- public boolean isEmpty() {
- lock.lock();
- try {
- if(memoryList.isEmpty() && isDiskListEmpty()){
- return true;
- }
- for (Iterator<MessageReference> iterator = memoryList.iterator();
iterator.hasNext();) {
- MessageReference node = iterator.next();
- if (node== QueueMessageReference.NULL_MESSAGE){
- continue;
- }
- if (!node.isDropped()) {
- return false;
- }
- // We can remove dropped references.
- iterator.remove();
+ public synchronized boolean isEmpty() {
+ if(memoryList.isEmpty() && isDiskListEmpty()){
+ return true;
+ }
+ for (Iterator<MessageReference> iterator = memoryList.iterator();
iterator.hasNext();) {
+ MessageReference node = iterator.next();
+ if (node== QueueMessageReference.NULL_MESSAGE){
+ continue;
}
- } finally {
- lock.unlock();
+ if (!node.isDropped()) {
+ return false;
+ }
+ // We can remove dropped references.
+ iterator.remove();
}
return isDiskListEmpty();
}
@@ -123,71 +116,48 @@
/**
* reset the cursor
*/
- public void reset() {
- lock.lock();
- try {
- iterating = true;
- last = null;
- iter = isDiskListEmpty() ? memoryList.iterator() :
getDiskList().listIterator();
- } finally {
- lock.unlock();
- }
+ public synchronized void reset() {
+ iterating = true;
+ last = null;
+ iter = isDiskListEmpty() ? memoryList.iterator() :
getDiskList().listIterator();
}
- public void release() {
- lock.lock();
- try {
- synchronized(this) {
- iterating = false;
- this.notifyAll();
- }
- if (flushRequired) {
- flushRequired = false;
- flushToDisk();
- }
- } finally {
- lock.unlock();
+ public synchronized void release() {
+ iterating = false;
+ if (flushRequired) {
+ flushRequired = false;
+ flushToDisk();
}
}
- public void destroy() throws Exception {
- lock.lock();
- try {
- stop();
- for (Iterator<MessageReference> i = memoryList.iterator();
i.hasNext();) {
- Message node = (Message)i.next();
- node.decrementReferenceCount();
- }
- memoryList.clear();
- if (!isDiskListEmpty()) {
- getDiskList().clear();
- }
- } finally {
- lock.unlock();
+ public synchronized void destroy() throws Exception {
+ stop();
+ for (Iterator<MessageReference> i = memoryList.iterator();
i.hasNext();) {
+ Message node = (Message)i.next();
+ node.decrementReferenceCount();
+ }
+ memoryList.clear();
+ if (!isDiskListEmpty()) {
+ getDiskList().clear();
}
}
- public LinkedList<MessageReference> pageInList(int maxItems) {
- int count = 0;
+ public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
LinkedList<MessageReference> result = new
LinkedList<MessageReference>();
- lock.lock();
- try {
- for (Iterator<MessageReference> i = memoryList.iterator();
i.hasNext() && count < maxItems;) {
- result.add(i.next());
+ int count = 0;
+ for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext()
&& count < maxItems;) {
+ result.add(i.next());
+ count++;
+ }
+ if (count < maxItems && !isDiskListEmpty()) {
+ for (Iterator<MessageReference> i = getDiskList().iterator();
i.hasNext() && count < maxItems;) {
+ Message message = (Message)i.next();
+ message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+ message.incrementReferenceCount();
+ result.add(message);
count++;
}
- if (count < maxItems && !isDiskListEmpty()) {
- for (Iterator<MessageReference> i = getDiskList().iterator();
i.hasNext() && count < maxItems;) {
- Message message = (Message)i.next();
- message.setRegionDestination(regionDestination);
-
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
- message.incrementReferenceCount();
- result.add(message);
- count++;
- }
- }
- } finally {
- lock.unlock();
}
return result;
}
@@ -197,52 +167,35 @@
*
* @param node
*/
- public void addMessageLast(MessageReference node) {
+ public synchronized void addMessageLast(MessageReference node) {
if (!node.isExpired()) {
try {
- lock.lock();
- try {
- while (iterating) {
- lock.unlock();
- synchronized(this) {
- try {
- this.wait();
- } catch (InterruptedException ie) {}
- }
- lock.lock();
+ regionDestination = node.getMessage().getRegionDestination();
+ if (isDiskListEmpty()) {
+ if (hasSpace() || this.store==null) {
+ memoryList.add(node);
+ node.incrementReferenceCount();
+ return;
}
- regionDestination =
node.getMessage().getRegionDestination();
+ }
+ if (!hasSpace()) {
if (isDiskListEmpty()) {
- if (hasSpace() || this.store==null) {
+ expireOldMessages();
+ if (hasSpace()) {
memoryList.add(node);
node.incrementReferenceCount();
return;
+ } else {
+ flushToDisk();
}
}
- if (!hasSpace()) {
- if (isDiskListEmpty()) {
- expireOldMessages();
- if (hasSpace()) {
- memoryList.add(node);
- node.incrementReferenceCount();
- return;
- } else {
- flushToDisk();
- }
- }
- }
- if (systemUsage.getTempUsage().isFull()) {
- lock.unlock();
- systemUsage.getTempUsage().waitForSpace();
- lock.lock();
- }
- getDiskList().add(node);
- } finally {
- lock.unlock();
}
+ systemUsage.getTempUsage().waitForSpace();
+ getDiskList().add(node);
+
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
- + " last to FilePendingMessageCursor ", e);
+ + " first to FilePendingMessageCursor ", e);
throw new RuntimeException(e);
}
} else {
@@ -255,50 +208,32 @@
*
* @param node
*/
- public void addMessageFirst(MessageReference node) {
+ public synchronized void addMessageFirst(MessageReference node) {
if (!node.isExpired()) {
try {
- lock.lock();
- try {
- while (iterating) {
- lock.unlock();
- synchronized(this) {
- try {
- this.wait();
- } catch (InterruptedException ie) {}
- }
- lock.lock();
+ regionDestination = node.getMessage().getRegionDestination();
+ if (isDiskListEmpty()) {
+ if (hasSpace()) {
+ memoryList.addFirst(node);
+ node.incrementReferenceCount();
+ return;
}
- regionDestination =
node.getMessage().getRegionDestination();
+ }
+ if (!hasSpace()) {
if (isDiskListEmpty()) {
+ expireOldMessages();
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
+ } else {
+ flushToDisk();
}
}
- if (!hasSpace()) {
- if (isDiskListEmpty()) {
- expireOldMessages();
- if (hasSpace()) {
- memoryList.addFirst(node);
- node.incrementReferenceCount();
- return;
- } else {
- flushToDisk();
- }
- }
- }
- if (systemUsage.getTempUsage().isFull()) {
- lock.unlock();
- systemUsage.getTempUsage().waitForSpace();
- lock.lock();
- }
- node.decrementReferenceCount();
- getDiskList().addFirst(node);
- } finally {
- lock.unlock();
}
+ systemUsage.getTempUsage().waitForSpace();
+ node.decrementReferenceCount();
+ getDiskList().addFirst(node);
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
@@ -309,38 +244,25 @@
discard(node);
}
}
-
+
/**
* @return true if there pending messages to dispatch
*/
- public boolean hasNext() {
- boolean result;
- lock.lock();
- try {
- result = iter.hasNext();
- } finally {
- lock.unlock();
- }
- return result;
+ public synchronized boolean hasNext() {
+ return iter.hasNext();
}
/**
* @return the next pending message
*/
- public MessageReference next() {
- Message message;
- lock.lock();
- try {
- message = (Message)iter.next();
- last = message;
- if (!isDiskListEmpty()) {
- // got from disk
- message.setRegionDestination(regionDestination);
- message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
- message.incrementReferenceCount();
- }
- } finally {
- lock.unlock();
+ public synchronized MessageReference next() {
+ Message message = (Message)iter.next();
+ last = message;
+ if (!isDiskListEmpty()) {
+ // got from disk
+ message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+ message.incrementReferenceCount();
}
return message;
}
@@ -348,15 +270,10 @@
/**
* remove the message at the cursor position
*/
- public void remove() {
- lock.lock();
- try {
- iter.remove();
- if (last != null) {
- last.decrementReferenceCount();
- }
- } finally {
- lock.unlock();
+ public synchronized void remove() {
+ iter.remove();
+ if (last != null) {
+ last.decrementReferenceCount();
}
}
@@ -364,61 +281,36 @@
* @param node
* @see
org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/
- public void remove(MessageReference node) {
- lock.lock();
- try {
- if (memoryList.remove(node)) {
- node.decrementReferenceCount();
- }
- if (!isDiskListEmpty()) {
- getDiskList().remove(node);
- }
- } finally {
- lock.unlock();
+ public synchronized void remove(MessageReference node) {
+ if (memoryList.remove(node)) {
+ node.decrementReferenceCount();
+ }
+ if (!isDiskListEmpty()) {
+ getDiskList().remove(node);
}
}
/**
* @return the number of pending messages
*/
- public int size() {
- int result;
- lock.lock();
- try {
- result = memoryList.size() + (isDiskListEmpty() ? 0 :
getDiskList().size());
- } finally {
- lock.unlock();
- }
- return result;
+ public synchronized int size() {
+ return memoryList.size() + (isDiskListEmpty() ? 0 :
getDiskList().size());
}
/**
* clear all pending messages
*/
- public void clear() {
- lock.lock();
- try {
- memoryList.clear();
- if (!isDiskListEmpty()) {
- getDiskList().clear();
- }
- last=null;
- } finally {
- lock.unlock();
+ public synchronized void clear() {
+ memoryList.clear();
+ if (!isDiskListEmpty()) {
+ getDiskList().clear();
}
+ last=null;
}
- public boolean isFull() {
- boolean result;
- lock.lock();
- try {
- // we always have space - as we can persist to disk
- // TODO: not necessarily true.
- result = false;
- } finally {
- lock.unlock();
- }
- return result;
+ public synchronized boolean isFull() {
+ // we always have space - as we can persist to disk
+ return false;
}
public boolean hasMessagesBufferedToDeliver() {
@@ -432,8 +324,7 @@
public void onUsageChanged(Usage usage, int oldPercentUsage,
int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
- lock.lock();
- try {
+ synchronized (this) {
flushRequired = true;
if (!iterating) {
expireOldMessages();
@@ -442,8 +333,6 @@
flushRequired = false;
}
}
- } finally {
- lock.unlock();
}
}
}
@@ -456,39 +345,31 @@
return hasSpace() && isDiskListEmpty();
}
- protected void expireOldMessages() {
- lock.lock();
- try {
- if (!memoryList.isEmpty()) {
- LinkedList<MessageReference> tmpList = new
LinkedList<MessageReference>(this.memoryList);
- this.memoryList = new LinkedList<MessageReference>();
- while (!tmpList.isEmpty()) {
- MessageReference node = tmpList.removeFirst();
- if (node.isExpired()) {
- discard(node);
- }else {
- memoryList.add(node);
- }
- }
+ protected synchronized void expireOldMessages() {
+ if (!memoryList.isEmpty()) {
+ LinkedList<MessageReference> tmpList = new
LinkedList<MessageReference>(this.memoryList);
+ this.memoryList = new LinkedList<MessageReference>();
+ while (!tmpList.isEmpty()) {
+ MessageReference node = tmpList.removeFirst();
+ if (node.isExpired()) {
+ discard(node);
+ }else {
+ memoryList.add(node);
+ }
}
- } finally {
- lock.unlock();
}
+
}
- protected void flushToDisk() {
- lock.lock();
- try {
- if (!memoryList.isEmpty()) {
- while (!memoryList.isEmpty()) {
- MessageReference node = memoryList.removeFirst();
- node.decrementReferenceCount();
- getDiskList().addLast(node);
- }
- memoryList.clear();
+ protected synchronized void flushToDisk() {
+
+ if (!memoryList.isEmpty()) {
+ while (!memoryList.isEmpty()) {
+ MessageReference node = memoryList.removeFirst();
+ node.decrementReferenceCount();
+ getDiskList().addLast(node);
}
- } finally {
- lock.unlock();
+ memoryList.clear();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=661295&r1=661294&r2=661295&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu May 29 04:27:33 2008
@@ -16,10 +16,12 @@
*/
package org.apache.activemq.broker.region.cursors;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,7 +89,7 @@
pendingCount = 0;
}
- public void addMessageLast(MessageReference node) throws Exception {
+ public synchronized void addMessageLast(MessageReference node) throws
Exception {
if (node != null) {
Message msg = node.getMessage();
if (started) {
@@ -102,7 +104,7 @@
}
}
- public void addMessageFirst(MessageReference node) throws Exception {
+ public synchronized void addMessageFirst(MessageReference node) throws
Exception {
if (node != null) {
Message msg = node.getMessage();
if (started) {
@@ -140,11 +142,6 @@
MessageReference result = currentCursor != null ? currentCursor.next()
: null;
return result;
}
-
- public synchronized void release() {
- nonPersistent.release();
- persistent.release();
- }
public synchronized void remove() {
if (currentCursor != null) {
@@ -162,7 +159,7 @@
pendingCount--;
}
- public void reset() {
+ public synchronized void reset() {
nonPersistent.reset();
persistent.reset();
}