Author: djencks
Date: Tue Jan 6 15:48:44 2009
New Revision: 732182
URL: http://svn.apache.org/viewvc?rev=732182&view=rev
Log:
Remove some dangerous method calls from the constructor (that did nothing),
make some fields final, remove duplicate start tracking, and finish generic
conversion
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=732182&r1=732181&r2=732182&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Jan 6 15:48:44 2009
@@ -16,13 +16,13 @@
*/
package org.apache.activemq.broker.region.cursors;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -44,22 +44,19 @@
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor
{
private static final Log LOG =
LogFactory.getLog(StoreDurableSubscriberCursor.class);
- private String clientId;
- private String subscriberName;
- private Map<Destination, TopicStorePrefetch> topics = new
HashMap<Destination, TopicStorePrefetch>();
- private List<PendingMessageCursor> storePrefetches = new
CopyOnWriteArrayList<PendingMessageCursor>();
- private boolean started;
- private PendingMessageCursor nonPersistent;
+ private final String clientId;
+ private final String subscriberName;
+ private final Map<Destination, TopicStorePrefetch> topics = new
HashMap<Destination, TopicStorePrefetch>();
+ private final List<PendingMessageCursor> storePrefetches = new
CopyOnWriteArrayList<PendingMessageCursor>();
+ private final PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
- private Subscription subscription;
+ private final Subscription subscription;
/**
- * @param broker
- * @param topic
- * @param clientId
- * @param subscriberName
- * @param maxBatchSize
- * @param subscription
- * @throws IOException
+ * @param broker Broker for this cursor
+ * @param clientId clientId for this cursor
+ * @param subscriberName subscriber name for this cursor
+ * @param maxBatchSize currently ignored
+ * @param subscription subscription for this cursor
*/
public StoreDurableSubscriberCursor(Broker broker,String clientId, String
subscriberName,int maxBatchSize, Subscription subscription) {
this.subscription=subscription;
@@ -70,17 +67,14 @@
}else {
this.nonPersistent = new VMPendingMessageCursor();
}
- this.nonPersistent.setMaxBatchSize(getMaxBatchSize());
+ //TODO is this correct? we are ignoring the constructor parameter
matchBatchSize
+// this.nonPersistent.setMaxBatchSize(getMaxBatchSize());
this.nonPersistent.setSystemUsage(systemUsage);
- this.nonPersistent.setEnableAudit(isEnableAudit());
- this.nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
- this.nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
this.storePrefetches.add(this.nonPersistent);
}
public synchronized void start() throws Exception {
- if (!started) {
- started = true;
+ if (!isStarted()) {
super.start();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.setMessageAudit(getMessageAudit());
@@ -90,8 +84,7 @@
}
public synchronized void stop() throws Exception {
- if (started) {
- started = false;
+ if (isStarted()) {
super.stop();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.stop();
@@ -116,7 +109,7 @@
tsp.setMaxProducersToAudit(getMaxProducersToAudit());
topics.put(destination, tsp);
storePrefetches.add(tsp);
- if (started) {
+ if (isStarted()) {
tsp.start();
}
}
@@ -130,7 +123,7 @@
* @throws Exception
*/
public synchronized List<MessageReference> remove(ConnectionContext
context, Destination destination) throws Exception {
- Object tsp = topics.remove(destination);
+ PendingMessageCursor tsp = topics.remove(destination);
if (tsp != null) {
storePrefetches.remove(tsp);
}
@@ -161,7 +154,7 @@
* Informs the Broker if the subscription needs to intervention to recover
* it's state e.g. DurableTopicSubscriber may do
*
- * @see org.apache.activemq.region.cursors.PendingMessageCursor
+ * @see
org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired() {
@@ -171,7 +164,7 @@
public synchronized void addMessageLast(MessageReference node) throws
Exception {
if (node != null) {
Message msg = node.getMessage();
- if (started) {
+ if (isStarted()) {
if (!msg.isPersistent()) {
nonPersistent.addMessageLast(node);
}
@@ -228,16 +221,14 @@
}
public synchronized void reset() {
- for (Iterator<PendingMessageCursor> i = storePrefetches.iterator();
i.hasNext();) {
- AbstractPendingMessageCursor tsp =
(AbstractPendingMessageCursor)i.next();
- tsp.reset();
+ for (PendingMessageCursor storePrefetch : storePrefetches) {
+ storePrefetch.reset();
}
}
public synchronized void release() {
- for (Iterator<PendingMessageCursor> i = storePrefetches.iterator();
i.hasNext();) {
- AbstractPendingMessageCursor tsp =
(AbstractPendingMessageCursor)i.next();
- tsp.release();
+ for (PendingMessageCursor storePrefetch : storePrefetches) {
+ storePrefetch.release();
}
}
@@ -250,24 +241,21 @@
}
public void setMaxBatchSize(int maxBatchSize) {
- for (Iterator<PendingMessageCursor> i = storePrefetches.iterator();
i.hasNext();) {
- AbstractPendingMessageCursor tsp =
(AbstractPendingMessageCursor)i.next();
- tsp.setMaxBatchSize(maxBatchSize);
+ for (PendingMessageCursor storePrefetch : storePrefetches) {
+ storePrefetch.setMaxBatchSize(maxBatchSize);
}
super.setMaxBatchSize(maxBatchSize);
}
public synchronized void gc() {
- for (Iterator<PendingMessageCursor> i = storePrefetches.iterator();
i.hasNext();) {
- PendingMessageCursor tsp = i.next();
+ for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
}
}
public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
- for (Iterator<PendingMessageCursor> i = storePrefetches.iterator();
i.hasNext();) {
- PendingMessageCursor tsp = i.next();
+ for (PendingMessageCursor tsp : storePrefetches) {
tsp.setSystemUsage(usageManager);
}
}
@@ -303,8 +291,7 @@
protected synchronized PendingMessageCursor getNextCursor() throws
Exception {
if (currentCursor == null || currentCursor.isEmpty()) {
currentCursor = null;
- for (Iterator<PendingMessageCursor> i =
storePrefetches.iterator(); i.hasNext();) {
- AbstractPendingMessageCursor tsp =
(AbstractPendingMessageCursor)i.next();
+ for (PendingMessageCursor tsp : storePrefetches) {
if (tsp.hasNext()) {
currentCursor = tsp;
break;