User: starksm
Date: 01/12/13 07:23:19
Modified: src/main/org/jboss/mq SpyConnectionConsumer.java
Log:
Use JBoss Logger and trace level priority msg in run loop
Revision Changes Path
1.7 +89 -43 jbossmq/src/main/org/jboss/mq/SpyConnectionConsumer.java
Index: SpyConnectionConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyConnectionConsumer.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyConnectionConsumer.java 2001/12/11 22:18:09 1.6
+++ SpyConnectionConsumer.java 2001/12/13 15:23:19 1.7
@@ -11,17 +11,21 @@
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ServerSession;
-
import javax.jms.ServerSessionPool;
+import org.jboss.logging.Logger;
+
/**
* This class implements javax.jms.ConnectionConsumer
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
-public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer,
SpyConsumer, Runnable {
+public class SpyConnectionConsumer
+ implements javax.jms.ConnectionConsumer, SpyConsumer, Runnable
+{
+ static Logger log = Logger.getLogger( SpyConnectionConsumer.class );
// The connection is the consumer was created with
Connection connection;
@@ -41,7 +45,6 @@
// The "listening" thread that gets messages from destination and queues them
for delivery to sessions
Thread internalThread;
- static org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance(
SpyConnectionConsumer.class );
/**
* SpyConnectionConsumer constructor comment.
@@ -88,19 +91,26 @@
}
public void addMessage( SpyMessage mes )
- throws JMSException {
- cat.debug( "" + this + "->addMessage(mes=" + mes + ")" );
- synchronized ( queue ) {
- if ( closed ) {
- cat.debug( "WARNING: NACK issued. The connection consumer was closed."
);
+ throws JMSException
+ {
+ if( log.isTraceEnabled() )
+ log.trace( "" + this + "->addMessage(mes=" + mes + ")" );
+ synchronized ( queue )
+ {
+ if ( closed )
+ {
+ log.warn( "NACK issued. The connection consumer was closed." );
connection.send( mes.getAcknowledgementRequest( false ) );
return;
}
- if ( waitingForMessage ) {
+ if ( waitingForMessage )
+ {
queue.addLast( mes );
queue.notifyAll();
- } else {
+ }
+ else
+ {
//unwanted message (due to consumer receive timing out?) Nack it.
connection.send( mes.getAcknowledgementRequest( false ) );
}
@@ -113,24 +123,33 @@
* @exception javax.jms.JMSException Description of Exception
*/
public void close()
- throws javax.jms.JMSException {
- synchronized ( queue ) {
- if ( closed ) {
+ throws javax.jms.JMSException
+ {
+ synchronized ( queue )
+ {
+ if ( closed )
+ {
return;
}
closed = true;
queue.notifyAll();
}
- if ( internalThread != null && !internalThread.equals( Thread.currentThread()
) ) {
- try {
+ if ( internalThread != null && !internalThread.equals( Thread.currentThread()
) )
+ {
+ try
+ {
internalThread.join();
- } catch ( InterruptedException e ) {
}
+ catch ( InterruptedException e )
+ {
+ }
}
- synchronized ( queue ) {
+ synchronized ( queue )
+ {
//nack messages on queue
- while ( !queue.isEmpty() ) {
+ while ( !queue.isEmpty() )
+ {
SpyMessage message = ( SpyMessage )queue.removeFirst();
connection.send( message.getAcknowledgementRequest( false ) );
}
@@ -138,41 +157,56 @@
}
}
- public String toString() {
+ public String toString()
+ {
return "SpyConnectionConsumer:" + destination;
}
//Used to facilitate delivery of messages to sessions from server session pool.
- public void run() {
+ public void run()
+ {
java.util.ArrayList mesList = new java.util.ArrayList();
- try {
+ try
+ {
+ boolean trace = log.isTraceEnabled();
outer :
- while ( true ) {
- synchronized( queue ){
+ while ( true )
+ {
+ synchronized( queue )
+ {
if(closed)
break outer;
}
//get Messages
- for(int i=0;i<maxMessages;i++){
+ for(int i=0;i<maxMessages;i++)
+ {
SpyMessage mes = connection.receive(subscription, -1); //receive no
wait
if(mes == null)
break;
else
mesList.add(mes);
}
- if(mesList.isEmpty()){
+ if(mesList.isEmpty())
+ {
SpyMessage mes = null;
- synchronized ( queue ) {
+ synchronized ( queue )
+ {
mes = connection.receive( subscription, 0 );
- if ( mes == null ) {
+ if ( mes == null )
+ {
waitingForMessage = true;
- while ( queue.isEmpty() && !closed ) {
- try {
+ while ( queue.isEmpty() && !closed )
+ {
+ try
+ {
queue.wait();
- } catch ( InterruptedException e ) {
+ }
+ catch ( InterruptedException e )
+ {
}
}
- if ( closed ) {
+ if ( closed )
+ {
waitingForMessage = false;
break outer;
}
@@ -186,28 +220,40 @@
ServerSession serverSession = serverSessionPool.getServerSession();
SpySession spySession = ( SpySession )serverSession.getSession();
- if ( spySession.sessionConsumer == null ) {
- cat.debug( "" + this + " Session did not have a set MessageListner"
);
- } else {
+ if ( spySession.sessionConsumer == null )
+ {
+ log.debug( "" + this + " Session did not have a set MessageListner"
);
+ }
+ else
+ {
spySession.sessionConsumer.subscription = subscription;
}
- for(int i=0;i<mesList.size();i++){
+ for(int i=0;i<mesList.size();i++)
+ {
spySession.addMessage( (SpyMessage)mesList.get(i) );
}
- cat.debug( "" + this + " Starting the ServerSession." );
+ if( trace )
+ log.trace( "" + this + " Starting the ServerSession." );
serverSession.start();
mesList.clear();
}
- } catch ( JMSException e ) {
- cat.warn( "Connection consumer closing due to error in listening thread.",
e );
- try {
- for(int i=0;i<mesList.size();i++){
- connection.send(
((SpyMessage)mesList.get(i)).getAcknowledgementRequest( false ) );
+ }
+ catch ( JMSException e )
+ {
+ log.warn( "Connection consumer closing due to error in listening thread.",
e );
+ try
+ {
+ for(int i=0;i<mesList.size();i++)
+ {
+ SpyMessage msg = (SpyMessage)mesList.get(i);
+ connection.send(msg.getAcknowledgementRequest( false ) );
}
close();
- } catch ( Exception ignore ) {
+ }
+ catch ( Exception ignore )
+ {
}
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development