Author: rajdavies
Date: Thu Oct 18 07:08:44 2007
New Revision: 585967
URL: http://svn.apache.org/viewvc?rev=585967&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1467
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Oct 18 07:08:44 2007
@@ -123,7 +123,7 @@
private List<ProxyConnector> proxyConnectors = new
CopyOnWriteArrayList<ProxyConnector>();
private List<ObjectName> registeredMBeanNames = new
CopyOnWriteArrayList<ObjectName>();
private List<JmsConnector> jmsConnectors = new
CopyOnWriteArrayList<JmsConnector>();
- private Service[] services;
+ private List<Service> services = new ArrayList<Service>();
private MasterConnector masterConnector;
private String masterConnectorURI;
private transient Thread shutdownHook;
@@ -449,8 +449,7 @@
removeShutdownHook();
ServiceStopper stopper = new ServiceStopper();
if (services != null) {
- for (int i = 0; i < services.length; i++) {
- Service service = services[i];
+ for (Service service: services) {
stopper.stop(service);
}
}
@@ -647,6 +646,7 @@
systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); //
Default 64 Meg
systemUsage.getTempUsage().setLimit(1024 * 1024 * 1024 * 100);
// 10 Gb
systemUsage.getStoreUsage().setLimit(1024 * 1024 * 1024 *
100); // 100 GB
+ addService(this.systemUsage);
}
return systemUsage;
} catch (IOException e) {
@@ -656,7 +656,11 @@
}
public void setSystemUsage(SystemUsage memoryManager) {
+ if (this.systemUsage != null) {
+ removeService(this.systemUsage);
+ }
this.systemUsage = memoryManager;
+ addService(this.systemUsage);
}
/**
@@ -667,6 +671,7 @@
if (consumerSystemUsage == null) {
consumerSystemUsage = new SystemUsage(getSystemUsage(),
"Consumer");
consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
+ addService(consumerSystemUsage);
}
return consumerSystemUsage;
}
@@ -675,7 +680,11 @@
* @param consumerUsageManager the consumerUsageManager to set
*/
public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
+ if (this.consumerSystemUsage != null) {
+ removeService(this.consumerSystemUsage);
+ }
this.consumerSystemUsage = consumerUsageManager;
+ addService(this.producerSystemUsage);
}
/**
@@ -686,6 +695,7 @@
if (producerSystemUsage == null) {
producerSystemUsage = new SystemUsage(getSystemUsage(),
"Producer");
producerSystemUsage.getMemoryUsage().setUsagePortion(0.45f);
+ addService(producerSystemUsage);
}
return producerSystemUsage;
}
@@ -694,7 +704,11 @@
* @param producerUsageManager the producerUsageManager to set
*/
public void setProducerSystemUsage(SystemUsage producerUsageManager) {
+ if (this.producerSystemUsage != null) {
+ removeService(this.producerSystemUsage);
+ }
this.producerSystemUsage = producerUsageManager;
+ addService(this.producerSystemUsage);
}
public PersistenceAdapter getPersistenceAdapter() throws IOException {
@@ -831,7 +845,7 @@
}
public Service[] getServices() {
- return services;
+ return (Service[]) services.toArray();
}
/**
@@ -839,7 +853,12 @@
* [EMAIL PROTECTED] MasterConnector}
*/
public void setServices(Service[] services) {
- this.services = services;
+ this.services.clear();
+ if (services != null) {
+ for (int i=0; i < services.length;i++) {
+ this.services.add(services[i]);
+ }
+ }
}
/**
@@ -847,15 +866,11 @@
* lifecycle
*/
public void addService(Service service) {
- if (services == null) {
- services = new Service[] {service};
- } else {
- int length = services.length;
- Service[] temp = new Service[length + 1];
- System.arraycopy(services, 1, temp, 1, length);
- temp[length] = service;
- services = temp;
- }
+ services.add(service);
+ }
+
+ public void removeService(Service service) {
+ services.remove(service);
}
public boolean isUseLoggingForShutdownErrors() {
@@ -1676,13 +1691,9 @@
JmsConnector connector = iter.next();
connector.start();
}
-
- if (services != null) {
- for (int i = 0; i < services.length; i++) {
- Service service = services[i];
- configureService(service);
- service.start();
- }
+ for (Service service:services) {
+ configureService(service);
+ service.start();
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
Thu Oct 18 07:08:44 2007
@@ -32,6 +32,9 @@
private Region region;
private ProducerState producerState;
private boolean mutable = true;
+
+ public ProducerBrokerExchange() {
+ }
/**
* @return the connectionContext
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Oct 18 07:08:44 2007
@@ -46,6 +46,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
@@ -613,6 +614,7 @@
ProducerBrokerExchange producerExchange = new
ProducerBrokerExchange();
producerExchange.setMutable(false);
producerExchange.setConnectionContext(context);
+ producerExchange.setProducerState(new
ProducerState(new ProducerInfo()));
context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
Thu Oct 18 07:08:44 2007
@@ -29,11 +29,11 @@
/**
* Constructs an empty list.
+ * @param header
*/
public VMIndexLinkedList(IndexItem header) {
this.root = header;
- this.root.next = root;
- root.prev = root;
+ this.root.next=this.root.prev=this.root;
}
public synchronized IndexItem getRoot() {
@@ -144,8 +144,7 @@
* @see org.apache.activemq.kaha.impl.IndexLinkedList#clear()
*/
public synchronized void clear() {
- root.next = root;
- root.prev = root;
+ root.next=root.prev=root;
size = 0;
}
@@ -258,12 +257,7 @@
if (e == root || e.equals(root)) {
return;
}
- if (e.prev==null){
- e.prev=root;
- }
- if (e.next==null){
- e.next=root;
- }
+
e.prev.next = e.next;
e.next.prev = e.prev;
size--;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
Thu Oct 18 07:08:44 2007
@@ -21,6 +21,12 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
@@ -49,6 +55,8 @@
private List<T> children = new CopyOnWriteArrayList<T>();
private final List<Runnable> callbacks = new LinkedList<Runnable>();
private int pollingTime = 100;
+ private ThreadPoolExecutor executor;
+ private AtomicBoolean started=new AtomicBoolean();
public Usage(T parent, String name, float portion) {
this.parent = parent;
@@ -233,25 +241,35 @@
return (int)((((retrieveUsage() * 100) / limiter.getLimit()) /
percentUsageMinDelta) * percentUsageMinDelta);
}
- private void fireEvent(int oldPercentUsage, int newPercentUsage) {
+ private void fireEvent(final int oldPercentUsage, final int
newPercentUsage) {
if (debug) {
LOG.debug("Memory usage change. from: " + oldPercentUsage + ",
to: " + newPercentUsage);
}
- // Switching from being full to not being full..
- if (oldPercentUsage >= 100 && newPercentUsage < 100) {
- synchronized (usageMutex) {
- usageMutex.notifyAll();
- for (Iterator<Runnable> iter = new
ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
- Runnable callback = iter.next();
- callback.run();
+ if (started.get()) {
+ // Switching from being full to not being full..
+ if (oldPercentUsage >= 100 && newPercentUsage < 100) {
+ synchronized (usageMutex) {
+ usageMutex.notifyAll();
+ for (Iterator<Runnable> iter = new
ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
+ Runnable callback = iter.next();
+ callback.run();
+ }
+ callbacks.clear();
}
- callbacks.clear();
}
- }
- // Let the listeners know
- for (Iterator<UsageListener> iter = listeners.iterator();
iter.hasNext();) {
- UsageListener l = iter.next();
- l.onUsageChanged(this, oldPercentUsage, newPercentUsage);
+ // Let the listeners know on a separate thread
+ Runnable listenerNotifier = new Runnable() {
+
+ public void run() {
+ for (Iterator<UsageListener> iter = listeners.iterator();
iter.hasNext();) {
+ UsageListener l = iter.next();
+ l.onUsageChanged(Usage.this, oldPercentUsage,
newPercentUsage);
+ }
+ }
+
+ };
+ listenerNotifier.run();
+ //getExecutor().execute(listenerNotifier);
}
}
@@ -264,21 +282,46 @@
}
@SuppressWarnings("unchecked")
- public void start() {
- if (parent != null) {
- parent.addChild(this);
+ public synchronized void start() {
+ if (started.compareAndSet(false, true)){
+ if (parent != null) {
+ parent.addChild(this);
+ }
+ for (T t:children) {
+ t.start();
+ }
}
}
@SuppressWarnings("unchecked")
- public void stop() {
- if (parent != null) {
- parent.removeChild(this);
+ public synchronized void stop() {
+ if (started.compareAndSet(true, false)){
+ if (parent != null) {
+ parent.removeChild(this);
+ }
+ if (this.executor != null){
+ this.executor.shutdownNow();
+ }
+ //clear down any callbacks
+ synchronized (usageMutex) {
+ usageMutex.notifyAll();
+ for (Iterator<Runnable> iter = new
ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
+ Runnable callback = iter.next();
+ callback.run();
+ }
+ this.callbacks.clear();
+ }
+ for (T t:children) {
+ t.stop();
+ }
}
}
private void addChild(T child) {
children.add(child);
+ if (started.get()) {
+ child.start();
+ }
}
private void removeChild(T child) {
@@ -356,5 +399,22 @@
public void setParent(T parent) {
this.parent = parent;
+ }
+
+ protected synchronized Executor getExecutor() {
+ if (this.executor == null) {
+ this.executor = new ThreadPoolExecutor(1, 1, 0,
+ TimeUnit.NANOSECONDS,
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, getName()
+ + " Usage Thread Pool");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+
+ }
+ return this.executor;
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
Thu Oct 18 07:08:44 2007
@@ -34,7 +34,9 @@
protected void setUp() throws Exception {
super.setUp();
for (int i = 0; i < NUMBER; i++) {
- testData.add(new IndexItem());
+ IndexItem item = new IndexItem();
+ item.setOffset(i);
+ testData.add(item);
}
root = new IndexItem();
list = new VMIndexLinkedList(root);