Author: rajdavies
Date: Thu Feb 7 04:55:02 2008
New Revision: 619387
URL: http://svn.apache.org/viewvc?rev=619387&view=rev
Log:
cursor fixes
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Thu Feb 7 04:55:02 2008
@@ -165,7 +165,6 @@
// the destination and that they should un-subscribe.. Then wait up
// to timeout time before
// dropping the subscription.
-
}
LOG.debug("Removing destination: " + destination);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Feb 7 04:55:02 2008
@@ -37,9 +37,11 @@
protected final MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
private int maxProducersToAudit=1024;
- private int maxAuditDepth=1;
+ private int maxAuditDepth=2048;
private boolean enableAudit=true;
private int maxPageSize=1000;
+ private boolean useCache=true;
+ private int minimumMessageSize=1024;
protected final DestinationStatistics destinationStatistics = new
DestinationStatistics();
/**
@@ -160,5 +162,21 @@
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
+ }
+
+ public boolean isUseCache() {
+ return useCache;
+ }
+
+ public void setUseCache(boolean useCache) {
+ this.useCache = useCache;
+ }
+
+ public int getMinimumMessageSize() {
+ return minimumMessageSize;
+ }
+
+ public void setMinimumMessageSize(int minimumMessageSize) {
+ this.minimumMessageSize = minimumMessageSize;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Thu Feb 7 04:55:02 2008
@@ -88,4 +88,12 @@
int getMaxPageSize();
public void setMaxPageSize(int maxPageSize);
+
+ public boolean isUseCache();
+
+ public void setUseCache(boolean useCache);
+
+ public int getMinimumMessageSize();
+
+ public void setMinimumMessageSize(int minimumMessageSize);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Thu Feb 7 04:55:02 2008
@@ -153,7 +153,6 @@
public boolean isEnableAudit() {
return next.isEnableAudit();
}
-
public void setEnableAudit(boolean enableAudit) {
next.setEnableAudit(enableAudit);
@@ -179,4 +178,20 @@
public void setMaxPageSize(int maxPageSize) {
next.setMaxPageSize(maxPageSize);
}
+
+ public boolean isUseCache() {
+ return next.isUseCache();
+ }
+
+ public void setUseCache(boolean useCache) {
+ next.setUseCache(useCache);
+ }
+
+ public int getMinimumMessageSize() {
+ return next.getMinimumMessageSize();
+ }
+
+ public void setMinimumMessageSize(int minimumMessageSize) {
+ next.setMinimumMessageSize(minimumMessageSize);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Feb 7 04:55:02 2008
@@ -111,6 +111,7 @@
messages.setEnableAudit(isEnableAudit());
messages.setMaxAuditDepth(getMaxAuditDepth());
messages.setMaxProducersToAudit(getMaxProducersToAudit());
+ messages.setUseCache(isUseCache());
if (messages.isRecoveryRequired()) {
store.recover(new MessageRecoveryListener() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Thu Feb 7 04:55:02 2008
@@ -36,10 +36,10 @@
protected int maxBatchSize = 100;
protected SystemUsage systemUsage;
protected int maxProducersToAudit=1024;
- protected int maxAuditDepth=1;
+ protected int maxAuditDepth=1000;
protected boolean enableAudit=true;
protected ActiveMQMessageAudit audit;
- protected boolean useCache=true;
+ protected boolean useCache=false;
private boolean started=false;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Feb 7 04:55:02 2008
@@ -16,10 +16,8 @@
*/
package org.apache.activemq.broker.region.cursors;
-import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.region.Destination;
@@ -38,7 +36,6 @@
*/
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
implements MessageRecoveryListener, UsageListener {
private static final Log LOG =
LogFactory.getLog(AbstractStoreCursor.class);
- protected static final int MAX_FILL_ATTEMPTS=3;
protected final Destination regionDestination;
protected final LinkedHashMap<MessageId,Message> batchList = new
LinkedHashMap<MessageId,Message> ();
protected boolean cacheEnabled=false;
@@ -52,15 +49,15 @@
public final synchronized void start() throws Exception{
if (!isStarted()) {
+ super.start();
+ clear();
+ resetBatch();
this.size = getStoreSize();
this.storeHasMessages=this.size > 0;
if (!this.storeHasMessages&&useCache) {
cacheEnabled=true;
}
- }
- super.start();
- clear();
- resetBatch();
+ }
getSystemUsage().getMemoryUsage().addUsageListener(this);
}
@@ -181,10 +178,8 @@
resetBatch();
this.batchResetNeeded = false;
}
- //we may have to move the store cursor past messages that have
- //already been delivered - but we also don't want it to spin
- int fillAttempts=0;
- while (fillAttempts < MAX_FILL_ATTEMPTS && this.batchList.isEmpty() &&
(this.storeHasMessages ||this.size >0)) {
+
+ if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size
>0)) {
this.storeHasMessages = false;
try {
doFillBatch();
@@ -195,7 +190,6 @@
if (!this.batchList.isEmpty()) {
this.storeHasMessages=true;
}
- fillAttempts++;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu Feb 7 04:55:02 2008
@@ -121,7 +121,7 @@
public synchronized boolean hasNext() {
- boolean result = pendingCount > 0;
+ boolean result = true;//pendingCount > 0;
if (result) {
try {
currentCursor = getNextCursor();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Feb 7 04:55:02 2008
@@ -51,12 +51,14 @@
private PendingDurableSubscriberMessageStoragePolicy
pendingDurableSubscriberPolicy;
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
private int maxProducersToAudit=32;
- private int maxAuditDepth=1024;
- private int maxQueueAuditDepth=1;
+ private int maxAuditDepth=2048;
+ private int maxQueueAuditDepth=2048;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
private int maxPageSize=1000;
+ private boolean useCache=true;
+ private long minimumMessageSize=1024;
public void configure(Broker broker,Queue queue) {
if (dispatchPolicy != null) {
@@ -78,6 +80,8 @@
queue.setMaxAuditDepth(getMaxQueueAuditDepth());
queue.setMaxProducersToAudit(getMaxProducersToAudit());
queue.setMaxPageSize(getMaxPageSize());
+ queue.setUseCache(isUseCache());
+ queue.setMinimumMessageSize((int) getMinimumMessageSize());
}
public void configure(Topic topic) {
@@ -99,6 +103,8 @@
topic.setMaxAuditDepth(getMaxAuditDepth());
topic.setMaxProducersToAudit(getMaxProducersToAudit());
topic.setMaxPageSize(getMaxPageSize());
+ topic.setUseCache(isUseCache());
+ topic.setMinimumMessageSize((int) getMinimumMessageSize());
}
public void configure(Broker broker, SystemUsage memoryManager,
TopicSubscription subscription) {
@@ -360,5 +366,24 @@
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
}
+
+ public boolean isUseCache() {
+ return useCache;
+ }
+
+ public void setUseCache(boolean useCache) {
+ this.useCache = useCache;
+ }
+
+ public long getMinimumMessageSize() {
+ return minimumMessageSize;
+ }
+
+ /**
+ * @org.apache.xbean.Property
propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+ */
+ public void setMinimumMessageSize(long minimumMessageSize) {
+ this.minimumMessageSize = minimumMessageSize;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
Thu Feb 7 04:55:02 2008
@@ -138,7 +138,7 @@
public int getSize() {
if (size == 0 && content == null && text != null) {
- size = AVERAGE_MESSAGE_SIZE_OVERHEAD;
+ size = getMinimumMessageSize();
if (marshalledProperties != null) {
size += marshalledProperties.getLength();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Thu Feb 7 04:55:02 2008
@@ -25,6 +25,7 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.ByteArrayInputStream;
@@ -41,7 +42,10 @@
*/
public abstract class Message extends BaseCommand implements MarshallAware,
MessageReference {
- public static final int AVERAGE_MESSAGE_SIZE_OVERHEAD = 8 * 1024;
+ /**
+ * The default minimum amount of memory a message is assumed to use
+ */
+ public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
protected MessageId messageId;
protected ActiveMQDestination originalDestination;
@@ -620,8 +624,9 @@
}
public int getSize() {
- if (size <= AVERAGE_MESSAGE_SIZE_OVERHEAD) {
- size = AVERAGE_MESSAGE_SIZE_OVERHEAD;
+ int minimumMessageSize = getMinimumMessageSize();
+ if (size < minimumMessageSize) {
+ size = minimumMessageSize;
if (marshalledProperties != null) {
size += marshalledProperties.getLength();
}
@@ -630,6 +635,16 @@
}
}
return size;
+ }
+
+ protected int getMinimumMessageSize() {
+ int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
+ //let destination override
+ Destination dest = regionDestination;
+ if (dest != null) {
+ result=dest.getMinimumMessageSize();
+ }
+ return result;
}
/**
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Thu Feb 7 04:55:02 2008
@@ -525,44 +525,12 @@
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener
listener) throws Exception {
- RecoveryListenerAdapter recoveryListener = new
RecoveryListenerAdapter(
- this, listener);
- if (referenceStore.supportsExternalBatchControl()) {
- lock.lock();
- try {
- referenceStore.recoverNextMessages(maxReturned,
- recoveryListener);
- if (recoveryListener.size() == 0 &&
recoveryListener.hasSpace()) {
- int count = 0;
- Iterator<Entry<MessageId, ReferenceData>> iterator =
messages
- .entrySet().iterator();
- while (iterator.hasNext() && count < maxReturned
- && recoveryListener.hasSpace()) {
- Entry<MessageId, ReferenceData> entry =
iterator.next();
- ReferenceData data = entry.getValue();
- Message message = getMessage(data);
- recoveryListener.recoverMessage(message);
- count++;
- }
- referenceStore.setBatch(recoveryListener
- .getLastRecoveredMessageId());
- }
- }finally {
- lock.unlock();
- }
- } else {
- flush();
- referenceStore.recoverNextMessages(maxReturned, recoveryListener);
- }
- /*
RecoveryListenerAdapter recoveryListener = new
RecoveryListenerAdapter(this, listener);
referenceStore.recoverNextMessages(maxReturned, recoveryListener);
if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
flush();
referenceStore.recoverNextMessages(maxReturned, recoveryListener);
}
- */
-
}
Message getMessage(ReferenceData data) throws IOException {