[JBoss-dev] CVS update: jbossmq/src/main/org/jboss/mq/server BasicQueue.java ClientConsumer.java
User: chirino Date: 01/12/08 21:56:36 Modified:src/main/org/jboss/mq/server BasicQueue.java ClientConsumer.java Log: updated the dc variable to connectionToken. more readable. fixed the NoLocal bug that was reported on the user forums. testcase in the suite shows now that it is working. Revision ChangesPath 1.10 +5 -4 jbossmq/src/main/org/jboss/mq/server/BasicQueue.java Index: BasicQueue.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- BasicQueue.java 2001/11/14 01:53:40 1.9 +++ BasicQueue.java 2001/12/09 05:56:36 1.10 @@ -31,7 +31,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * @createdAugust 16, 2001 - * @version$Revision: 1.9 $ + * @version$Revision: 1.10 $ */ //abstract public class BasicQueue implements Runnable { public class BasicQueue { @@ -144,9 +144,9 @@ public SpyMessage receive(Subscription sub, boolean wait) throws JMSException { MessageReference messageRef = null; - Selector selector = sub.getSelector(); synchronized (receivers) { - if (selector == null) { + // If the subscription is not picky, the first message will be it + if (sub.getSelector() == null && sub.noLocal==false ) { synchronized (messages) { if (messages.size() != 0) { messageRef = (MessageReference)messages.first(); @@ -154,11 +154,12 @@ } } } else { + // The subscription is picky, so we have to iterate. synchronized (messages) { Iterator i = messages.iterator(); while (i.hasNext()) { MessageReference mr = (MessageReference) i.next(); - if (selector.test(mr.getHeaders())) { + if (sub.accepts(mr.getHeaders())) { messageRef = mr; i.remove(); break; 1.10 +9 -9 jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java Index: ClientConsumer.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- ClientConsumer.java 2001/11/26 06:33:12 1.9 +++ ClientConsumer.java 2001/12/09 05:56:36 1.10 @@ -33,7 +33,7 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * @createdAugust 16, 2001 - * @version$Revision: 1.9 $ + * @version$Revision: 1.10 $ */ public class ClientConsumer implements Work { @@ -41,7 +41,7 @@ //The JMSServer object JMSServer server; //The connection this queue will send messages over - ConnectionToken dc; + ConnectionToken connectionToken; //Is this connection enabled (Can we transmit to the receiver) boolean enabled; //Has this connection been closed? @@ -73,11 +73,11 @@ // Constructor --- - public ClientConsumer(JMSServer server, ConnectionToken dc) throws JMSException + public ClientConsumer(JMSServer server, ConnectionToken connectionToken) throws JMSException { this.server = server; - this.dc = dc; - log = Logger.getLogger(ClientConsumer.class.getName() + ":" + dc.getClientID()); + this.connectionToken = connectionToken; + log = Logger.getLogger(ClientConsumer.class.getName() + ":" + connectionToken.getClientID()); // Create thread pool synchronized (ClientConsumer.class) { @@ -130,7 +130,7 @@ { if( log.isTraceEnabled() ) log.trace("Adding subscription for: " + req); - req.dc = dc; + req.connectionToken = connectionToken; req.clientConsumer = this; JMSDestination jmsdest = (JMSDestination) server.getJMSDestination(req.destination); @@ -272,7 +272,7 @@ } - dc.clientIL.receive(job); + connectionToken.clientIL.receive(job); } catch(Exception e) @@ -280,7 +280,7 @@ log.warn("Could not send messages to a receiver.", e); try { -server.connectionFailure(dc); +server.connectionFailure(connectionToken); } catch(Throwable ignore) { @@ -291,7 +291,7 @@ public String toString() { - return "ClientConsumer:" + dc.getClientID(); + return "Cl
[JBoss-dev] CVS update: jbossmq/src/main/org/jboss/mq/server BasicQueue.java ClientConsumer.java JMSDestination.java JMSServer.java
User: starksm Date: 01/11/27 22:15:32 Modified:src/main/org/jboss/mq/server Tag: Branch_2_4 BasicQueue.java ClientConsumer.java JMSDestination.java JMSServer.java Log: Complete switch to org.jboss.logging.Logger and use trace to reduce default logging Add support for package version manifest headers Revision ChangesPath No revision No revision 1.4.2.3 +207 -128 jbossmq/src/main/org/jboss/mq/server/BasicQueue.java Index: BasicQueue.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v retrieving revision 1.4.2.2 retrieving revision 1.4.2.3 diff -u -r1.4.2.2 -r1.4.2.3 --- BasicQueue.java 2001/10/22 20:38:51 1.4.2.2 +++ BasicQueue.java 2001/11/28 06:15:32 1.4.2.3 @@ -18,6 +18,7 @@ import javax.jms.Destination; import javax.jms.JMSException; +import org.jboss.logging.Logger; import org.jboss.mq.*; import org.jboss.mq.pm.PersistenceManager; import org.jboss.mq.selectors.Selector; @@ -30,10 +31,12 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * @createdAugust 16, 2001 - * @version$Revision: 1.4.2.2 $ + * @version$Revision: 1.4.2.3 $ */ //abstract public class BasicQueue implements Runnable { -public class BasicQueue { +public class BasicQueue +{ + static Logger log = Logger.getLogger( BasicQueue.class ); //List of messages waiting to be dispatched SortedSetmessages = new TreeSet(); //The JMSServer object @@ -43,123 +46,157 @@ //List of messages that should be acked or else returned to thier //owning exclusive queues. HashMap unacknowledgedMessages = new HashMap(); - + HashMap removedSubscribers = new HashMap(); - - static org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance( BasicQueue.class ); - + // Constructor --- public BasicQueue( JMSServer server ) - throws JMSException { + throws JMSException + { this.server = server; } - - public void clientConsumerStopped( ClientConsumer clientConsumer ) { + + public void clientConsumerStopped( ClientConsumer clientConsumer ) + { //remove all waiting subs for this clientConsumer and send to its blocked list. - synchronized ( receivers ) { - for ( Iterator it = receivers.iterator(); it.hasNext(); ) { + synchronized ( receivers ) + { + for ( Iterator it = receivers.iterator(); it.hasNext(); ) + { Subscription sub = ( Subscription )it.next(); -if ( sub.clientConsumer.equals( clientConsumer ) ) { +if ( sub.clientConsumer.equals( clientConsumer ) ) +{ clientConsumer.addBlockedSubscription( sub ); it.remove(); } } } } - + //Used to put a message that was added previously to the queue, back in the queue - public void restoreMessage( SpyMessage mes ) { + public void restoreMessage( SpyMessage mes ) + { internalAddMessage( mes ); } - - + + public SpyMessage[] browse( String selector ) - throws JMSException { - - if ( selector == null ) { + throws JMSException + { + + if ( selector == null ) + { SpyMessage list[]; - synchronized ( messages ) { + synchronized ( messages ) + { list = new SpyMessage[messages.size()]; list = ( SpyMessage[] )messages.toArray( list ); } return list; - } else { + } + else + { Selector s = new Selector( selector ); LinkedList selection = new LinkedList(); - - synchronized ( messages ) { + + synchronized ( messages ) + { Iterator i = messages.iterator(); -while ( i.hasNext() ) { +while ( i.hasNext() ) +{ SpyMessage m = ( SpyMessage )i.next(); - if ( s.test( m ) ) { + if ( s.test( m ) ) + { selection.add( m ); } } } - + SpyMessage list[]; list = new SpyMessage[selection.size()]; list = ( SpyMessage[] )selection.toArray( list ); return list; } } - - public void addReceiver( Subscription sub ) { - synchronized ( messages ) { - if ( messages.size() != 0 ) { -for ( Iterator
[JBoss-dev] CVS update: jbossmq/src/main/org/jboss/mq/server BasicQueue.java ClientConsumer.java JBossMQService.java JBossMQServiceMBean.java JMSDestination.java JMSQueue.java JMSServer.java JMSServerMBean.java JMSTopic.java PersistentQueue.java QueueManager.java QueueManagerMBean.java StateManager.java StateManagerMBean.java TopicManager.java TopicManagerMBean.java
User: chirino Date: 01/08/22 20:57:12 Added: src/main/org/jboss/mq/server Tag: Branch_2_4 BasicQueue.java ClientConsumer.java JBossMQService.java JBossMQServiceMBean.java JMSDestination.java JMSQueue.java JMSServer.java JMSServerMBean.java JMSTopic.java PersistentQueue.java QueueManager.java QueueManagerMBean.java StateManager.java StateManagerMBean.java TopicManager.java TopicManagerMBean.java Log: Back porting JBossMQ 1.0.0 BETA 1 to 2_4 branch Revision ChangesPath No revision No revision 1.4.2.1 +1 -1 jbossmq/src/main/org/jboss/mq/server/BasicQueue.java Index: BasicQueue.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v retrieving revision 1.4 retrieving revision 1.4.2.1 diff -u -r1.4 -r1.4.2.1 --- BasicQueue.java 2001/08/17 03:04:06 1.4 +++ BasicQueue.java 2001/08/23 03:57:12 1.4.2.1 @@ -30,7 +30,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * @createdAugust 16, 2001 - * @version$Revision: 1.4 $ + * @version$Revision: 1.4.2.1 $ */ //abstract public class BasicQueue implements Runnable { public class BasicQueue { 1.4.2.1 +1 -1 jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java Index: ClientConsumer.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v retrieving revision 1.4 retrieving revision 1.4.2.1 diff -u -r1.4 -r1.4.2.1 --- ClientConsumer.java 2001/08/17 03:04:06 1.4 +++ ClientConsumer.java 2001/08/23 03:57:12 1.4.2.1 @@ -25,7 +25,7 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * @createdAugust 16, 2001 - * @version$Revision: 1.4 $ + * @version$Revision: 1.4.2.1 $ */ public class ClientConsumer implements Runnable { //The JMSServer object 1.3.2.1 +1 -1 jbossmq/src/main/org/jboss/mq/server/JBossMQService.java Index: JBossMQService.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JBossMQService.java,v retrieving revision 1.3 retrieving revision 1.3.2.1 diff -u -r1.3 -r1.3.2.1 --- JBossMQService.java 2001/08/18 00:07:49 1.3 +++ JBossMQService.java 2001/08/23 03:57:12 1.3.2.1 @@ -26,7 +26,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @createdAugust 16, 2001 * @seeJBossMQ subproject - * @version$Revision: 1.3 $ + * @version$Revision: 1.3.2.1 $ */ public class JBossMQService extends org.jboss.util.ServiceMBeanSupport implements JBossMQServiceMBean { MBeanServer mBeanServer = null; 1.2.2.1 +1 -1 jbossmq/src/main/org/jboss/mq/server/JBossMQServiceMBean.java Index: JBossMQServiceMBean.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JBossMQServiceMBean.java,v retrieving revision 1.2 retrieving revision 1.2.2.1 diff -u -r1.2 -r1.2.2.1 --- JBossMQServiceMBean.java 2001/08/17 03:04:06 1.2 +++ JBossMQServiceMBean.java 2001/08/23 03:57:12 1.2.2.1 @@ -62,7 +62,7 @@ * @author Vincent Sheffer ([EMAIL PROTECTED]) * @createdAugust 16, 2001 * @see - * @version$Revision: 1.2 $ + * @version$Revision: 1.2.2.1 $ */ public interface JBossMQServiceMBean extends org.jboss.util.ServiceMBean { 1.3.2.1 +1 -1 jbossmq/src/main/org/jboss/mq/server/JMSDestination.java Index: JMSDestination.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSDestination.java,v retrieving revision 1.3 retrieving revision 1.3.2.1 diff -u -r1.3 -r1.3.2.1 --- JMSDestination.java 2001/08/17 03:04:06 1.3 +++ JMSDestination.java 2001/08/23 03:57:12 1.3.2.1 @@ -26,7 +26,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * @createdAugust 16, 2001 - * @version$Revision: 1.3 $ + * @version$Revision: 1.3.2.1 $ */ public abstract class JMSDestination { 1.4.2.1 +1 -1 jbossmq/src/main/org/jboss/mq/server/JMSQueue.java Index: JMSQueue.java === RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSQueue.java,v retrieving revision 1.4 retriev