Author: chirino
Date: Fri Mar 9 07:59:14 2007
New Revision: 516444
URL: http://svn.apache.org/viewvc?view=rev&rev=516444
Log:
Refactor so that the ProducerBrokerExchange is passed all the way down to the
Topic and Queue implementations.
This is laying the ground work to implement window based producer flow control.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Fri Mar 9 07:59:14 2007
@@ -303,7 +303,7 @@
producerExchange.setRegionDestination(regionDestination);
}
- producerExchange.getRegionDestination().send(context, messageSend);
+ producerExchange.getRegionDestination().send(producerExchange,
messageSend);
}
public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck
ack) throws Exception{
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Fri Mar 9 07:59:14 2007
@@ -21,6 +21,7 @@
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -36,7 +37,7 @@
void addSubscription(ConnectionContext context, Subscription sub) throws
Exception;
void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception;
- void send(ConnectionContext context, Message messageSend) throws Exception;
+ void send(ProducerBrokerExchange producerExchange, Message messageSend)
throws Exception;
boolean lock(MessageReference node, LockOwner lockOwner);
void acknowledge(ConnectionContext context, Subscription sub, final
MessageAck ack, final MessageReference node) throws IOException;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Fri Mar 9 07:59:14 2007
@@ -19,6 +19,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -89,7 +90,7 @@
next.removeSubscription(context, sub);
}
- public void send(ConnectionContext context, Message messageSend) throws
Exception {
+ public void send(ProducerBrokerExchange context, Message messageSend)
throws Exception {
next.send(context, messageSend);
}
@@ -104,8 +105,8 @@
/**
* Sends a message to the given destination which may be a wildcard
*/
- protected void send(ConnectionContext context, Message message,
ActiveMQDestination destination) throws Exception {
- Broker broker = context.getBroker();
+ protected void send(ProducerBrokerExchange context, Message message,
ActiveMQDestination destination) throws Exception {
+ Broker broker = context.getConnectionContext().getBroker();
Set destinations = broker.getDestinations(destination);
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar 9 07:59:14 2007
@@ -23,9 +23,12 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@@ -316,7 +319,8 @@
}
- public void send(final ConnectionContext context,final Message message)
throws Exception{
+ public void send(final ProducerBrokerExchange producerExchange,final
Message message) throws Exception {
+ final ConnectionContext context =
producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if(message.isExpired()){
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Mar 9 07:59:14 2007
@@ -236,8 +236,9 @@
- public void send(final ConnectionContext context, final Message message)
throws Exception {
-
+ public void send(final ProducerBrokerExchange producerExchange, final
Message message) throws Exception {
+ final ConnectionContext context =
producerExchange.getConnectionContext();
+
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if( message.isExpired() ) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
Fri Mar 9 07:59:14 2007
@@ -17,16 +17,16 @@
*/
package org.apache.activemq.broker.region.virtual;
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.MessageEvaluationContext;
-import java.util.Collection;
-import java.util.Iterator;
-
/**
* Represents a composite [EMAIL PROTECTED] Destination} where send()s are
replicated to
* each Destination instance.
@@ -46,7 +46,7 @@
this.copyMessage = copyMessage;
}
- public void send(ConnectionContext context, Message message) throws
Exception {
+ public void send(ProducerBrokerExchange context, Message message) throws
Exception {
MessageEvaluationContext messageContext = null;
for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
Fri Mar 9 07:59:14 2007
@@ -16,18 +16,18 @@
*/
package org.apache.activemq.broker.region.virtual;
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.DestinationMap;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
/**
* Implements <a
* href="http://activemq.apache.org/virtual-destinations.html">Virtual
@@ -77,7 +77,7 @@
protected Destination createCompositeDestination(Destination destination,
final List destinations) {
return new DestinationFilter(destination) {
- public void send(ConnectionContext context, Message messageSend)
throws Exception {
+ public void send(ProducerBrokerExchange context, Message
messageSend) throws Exception {
for (Iterator iter = destinations.iterator(); iter.hasNext();)
{
Destination destination = (Destination) iter.next();
destination.send(context, messageSend);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
Fri Mar 9 07:59:14 2007
@@ -17,7 +17,7 @@
*/
package org.apache.activemq.broker.region.virtual;
-import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
@@ -41,7 +41,7 @@
this.postfix = postfix;
}
- public void send(ConnectionContext context, Message message) throws
Exception {
+ public void send(ProducerBrokerExchange context, Message message) throws
Exception {
ActiveMQDestination queueConsumers =
getQueueConsumersWildcard(message.getDestination());
send(context, message, queueConsumers);
}