User: starksm Date: 01/12/13 07:23:19 Modified: src/main/org/jboss/mq SpyConnectionConsumer.java Log: Use JBoss Logger and trace level priority msg in run loop Revision Changes Path 1.7 +89 -43 jbossmq/src/main/org/jboss/mq/SpyConnectionConsumer.java Index: SpyConnectionConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyConnectionConsumer.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- SpyConnectionConsumer.java 2001/12/11 22:18:09 1.6 +++ SpyConnectionConsumer.java 2001/12/13 15:23:19 1.7 @@ -11,17 +11,21 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.ServerSession; - import javax.jms.ServerSessionPool; +import org.jboss.logging.Logger; + /** * This class implements javax.jms.ConnectionConsumer * * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.6 $ + * @version $Revision: 1.7 $ */ -public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer, SpyConsumer, Runnable { +public class SpyConnectionConsumer + implements javax.jms.ConnectionConsumer, SpyConsumer, Runnable +{ + static Logger log = Logger.getLogger( SpyConnectionConsumer.class ); // The connection is the consumer was created with Connection connection; @@ -41,7 +45,6 @@ // The "listening" thread that gets messages from destination and queues them for delivery to sessions Thread internalThread; - static org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance( SpyConnectionConsumer.class ); /** * SpyConnectionConsumer constructor comment. @@ -88,19 +91,26 @@ } public void addMessage( SpyMessage mes ) - throws JMSException { - cat.debug( "" + this + "->addMessage(mes=" + mes + ")" ); - synchronized ( queue ) { - if ( closed ) { - cat.debug( "WARNING: NACK issued. The connection consumer was closed." ); + throws JMSException + { + if( log.isTraceEnabled() ) + log.trace( "" + this + "->addMessage(mes=" + mes + ")" ); + synchronized ( queue ) + { + if ( closed ) + { + log.warn( "NACK issued. The connection consumer was closed." ); connection.send( mes.getAcknowledgementRequest( false ) ); return; } - if ( waitingForMessage ) { + if ( waitingForMessage ) + { queue.addLast( mes ); queue.notifyAll(); - } else { + } + else + { //unwanted message (due to consumer receive timing out?) Nack it. connection.send( mes.getAcknowledgementRequest( false ) ); } @@ -113,24 +123,33 @@ * @exception javax.jms.JMSException Description of Exception */ public void close() - throws javax.jms.JMSException { - synchronized ( queue ) { - if ( closed ) { + throws javax.jms.JMSException + { + synchronized ( queue ) + { + if ( closed ) + { return; } closed = true; queue.notifyAll(); } - if ( internalThread != null && !internalThread.equals( Thread.currentThread() ) ) { - try { + if ( internalThread != null && !internalThread.equals( Thread.currentThread() ) ) + { + try + { internalThread.join(); - } catch ( InterruptedException e ) { } + catch ( InterruptedException e ) + { + } } - synchronized ( queue ) { + synchronized ( queue ) + { //nack messages on queue - while ( !queue.isEmpty() ) { + while ( !queue.isEmpty() ) + { SpyMessage message = ( SpyMessage )queue.removeFirst(); connection.send( message.getAcknowledgementRequest( false ) ); } @@ -138,41 +157,56 @@ } } - public String toString() { + public String toString() + { return "SpyConnectionConsumer:" + destination; } //Used to facilitate delivery of messages to sessions from server session pool. - public void run() { + public void run() + { java.util.ArrayList mesList = new java.util.ArrayList(); - try { + try + { + boolean trace = log.isTraceEnabled(); outer : - while ( true ) { - synchronized( queue ){ + while ( true ) + { + synchronized( queue ) + { if(closed) break outer; } //get Messages - for(int i=0;i<maxMessages;i++){ + for(int i=0;i<maxMessages;i++) + { SpyMessage mes = connection.receive(subscription, -1); //receive no wait if(mes == null) break; else mesList.add(mes); } - if(mesList.isEmpty()){ + if(mesList.isEmpty()) + { SpyMessage mes = null; - synchronized ( queue ) { + synchronized ( queue ) + { mes = connection.receive( subscription, 0 ); - if ( mes == null ) { + if ( mes == null ) + { waitingForMessage = true; - while ( queue.isEmpty() && !closed ) { - try { + while ( queue.isEmpty() && !closed ) + { + try + { queue.wait(); - } catch ( InterruptedException e ) { + } + catch ( InterruptedException e ) + { } } - if ( closed ) { + if ( closed ) + { waitingForMessage = false; break outer; } @@ -186,28 +220,40 @@ ServerSession serverSession = serverSessionPool.getServerSession(); SpySession spySession = ( SpySession )serverSession.getSession(); - if ( spySession.sessionConsumer == null ) { - cat.debug( "" + this + " Session did not have a set MessageListner" ); - } else { + if ( spySession.sessionConsumer == null ) + { + log.debug( "" + this + " Session did not have a set MessageListner" ); + } + else + { spySession.sessionConsumer.subscription = subscription; } - for(int i=0;i<mesList.size();i++){ + for(int i=0;i<mesList.size();i++) + { spySession.addMessage( (SpyMessage)mesList.get(i) ); } - cat.debug( "" + this + " Starting the ServerSession." ); + if( trace ) + log.trace( "" + this + " Starting the ServerSession." ); serverSession.start(); mesList.clear(); } - } catch ( JMSException e ) { - cat.warn( "Connection consumer closing due to error in listening thread.", e ); - try { - for(int i=0;i<mesList.size();i++){ - connection.send( ((SpyMessage)mesList.get(i)).getAcknowledgementRequest( false ) ); + } + catch ( JMSException e ) + { + log.warn( "Connection consumer closing due to error in listening thread.", e ); + try + { + for(int i=0;i<mesList.size();i++) + { + SpyMessage msg = (SpyMessage)mesList.get(i); + connection.send(msg.getAcknowledgementRequest( false ) ); } close(); - } catch ( Exception ignore ) { + } + catch ( Exception ignore ) + { } } }
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development