User: starksm Date: 01/12/13 19:19:13 Modified: src/main/org/jboss/mq Tag: Branch_2_4 SpyMessageConsumer.java SpyXAResource.java SpyXAResourceManager.java Log: Merge changes from main to improve the ASF behavior and update the logging. Revision Changes Path No revision No revision 1.7.2.6 +97 -36 jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java Index: SpyMessageConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v retrieving revision 1.7.2.5 retrieving revision 1.7.2.6 diff -u -r1.7.2.5 -r1.7.2.6 --- SpyMessageConsumer.java 2001/12/13 21:29:21 1.7.2.5 +++ SpyMessageConsumer.java 2001/12/14 03:19:12 1.7.2.6 @@ -11,28 +11,26 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; - import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.jboss.logging.Logger; -import org.jboss.mq.selectors.Selector; /** - * This class implements javax.jms.MessageConsumer + * This class implements <tt>javax.jms.MessageConsumer</tt>. * * @author Norbert Lataille ([EMAIL PROTECTED]) * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.7.2.5 $ + * @version $Revision: 1.7.2.6 $ */ public class SpyMessageConsumer implements MessageConsumer, SpyConsumer, Runnable { static Logger log = Logger.getLogger( SpyMessageConsumer.class ); - + //Link to my session public SpySession session; // The subscription structure should be fill out by the decendent @@ -100,7 +98,7 @@ // Public -------------------------------------------------------- public String getMessageSelector() - throws JMSException + throws JMSException { if ( closed ) { @@ -111,7 +109,7 @@ } public MessageListener getMessageListener() - throws JMSException + throws JMSException { if ( closed ) { @@ -150,7 +148,6 @@ synchronized ( messages ) { - SpyMessage msg = session.connection.receive( subscription, 0 ); if ( msg != null ) { @@ -180,15 +177,17 @@ return mes; } if( log.isTraceEnabled() ) - log.trace( "SpyMessageConsumer: receive in messages.wait()" ); + log.trace("receive in messages.wait()"); messages.wait(); } - } catch ( InterruptedException e ) + } + catch ( InterruptedException e ) { JMSException newE = new SpyJMSException( "Receive interupted" ); newE.setLinkedException( e ); throw newE; - } finally + } + finally { waitingForMessage = false; synchronized ( stateLock ) @@ -201,7 +200,7 @@ } public Message receive( long timeOut ) - throws JMSException + throws JMSException { if ( timeOut == 0 ) { @@ -230,7 +229,6 @@ synchronized ( messages ) { - SpyMessage msg = session.connection.receive( subscription, timeOut ); if ( msg != null ) { @@ -270,7 +268,8 @@ messages.wait( att ); } - } catch ( InterruptedException e ) + } + catch ( InterruptedException e ) { JMSException newE = new SpyJMSException( "Receive interupted" ); newE.setLinkedException( e ); @@ -288,7 +287,7 @@ } public Message receiveNoWait() - throws JMSException + throws JMSException { if ( closed ) { @@ -321,9 +320,11 @@ } public void close() - throws JMSException + throws JMSException { + log.debug("Message consumer closing."); + synchronized ( messages ) { if ( closed ) @@ -335,6 +336,16 @@ messages.notify(); } + if ( listenerThread != null && !Thread.currentThread().equals(listenerThread) ) + { + try + { + listenerThread.join(); + } + catch(InterruptedException e) + { } + } + if ( !sessionConsumer ) { session.removeConsumer( this ); @@ -355,28 +366,33 @@ } //Add a message to the queue + + // Consider removing this test (subscription.accepts). I don't think it will ever fail + // because the test is also done by the server before message is even sent. if ( subscription.accepts( message ) ) { if ( sessionConsumer ) { sessionConsumerProcessMessage( message ); - } else + } + else { if ( waitingForMessage ) { messages.addLast( message ); messages.notifyAll(); - } else + } + else { //unwanted message (due to consumer receive timing out) Nack it. + log.debug( "WARNING: NACK issued. The message consumer was not waiting for a message." ); session.connection.send( message.getAcknowledgementRequest( false ) ); } } } else { - if( log.isTraceEnabled() ) - log.trace( "WARNING: NACK issued. The subscription did not accept the message" ); + log.debug( "WARNING: NACK issued. The subscription did not accept the message" ); session.connection.send( message.getAcknowledgementRequest( false ) ); } } @@ -388,6 +404,7 @@ SpyMessage mes = null; try { + boolean trace = log.isTraceEnabled(); outer : while ( true ) { @@ -423,10 +440,15 @@ waitingForMessage = false; } } + + if( trace ) + log.trace("Recevied msg: "+mes); mes.session = session; if ( mes.isOutdated() ) { //Drop message (it has expired) + if( trace ) + log.trace("Droping expired msg: "+mes); mes.doAcknowledge(); mes = null; } @@ -503,13 +525,12 @@ } protected void sessionConsumerProcessMessage( SpyMessage message ) - throws JMSException + throws JMSException { message.session = session; if ( message.isOutdated() ) { - if( log.isTraceEnabled() ) - log.trace( "SessionQueue: I dropped a message (timeout)" ); + log.debug( "I dropped a message (timeout)" ); message.doAcknowledge(); return; } @@ -519,6 +540,21 @@ { thisListener = messageListener; } + + // Add the message to XAResource manager before we call onMessages since the + // resource may get elisted IN the onMessage method. This gives onMessage a chance to roll the message back. + Object anonymousTXID=null; + if ( session.transacted ) + { + // Only happens with XA transactions + if( session.currentTransactionId == null ) + { + anonymousTXID = session.connection.spyXAResourceManager.startTx(); + session.currentTransactionId = anonymousTXID; + } + session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, message ); + } + if ( thisListener != null ) { Message mes = message; @@ -528,21 +564,45 @@ } thisListener.onMessage( mes ); } - //ack - if ( session.transacted ) + + if (session.transacted) { - session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, message ); - } else if ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE ) + // If we started an anonymous tx + if (anonymousTXID != null) + { + if (session.currentTransactionId == anonymousTXID) + { + // This is bad.. We are an XA controled TX but no TM ever elisted us. + // rollback the work and spit an error + try + { + session.connection.spyXAResourceManager.endTx(anonymousTXID, true); + session.connection.spyXAResourceManager.rollback(anonymousTXID); + } catch (javax.transaction.xa.XAException e) + { + log.error("Could not rollback", e); + } finally + { + session.currentTransactionId = null; + } + throw new SpyJMSException("Messaged delivery was not controled by a Transaction Manager"); + } + } + } + else { - message.doAcknowledge(); + // Should we Auto-ack the message since the message has now been processesed + if (session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) + { + message.doAcknowledge(); + } } } Message getMessage() { synchronized ( messages ) - { - + { while ( true ) { @@ -563,24 +623,24 @@ } return rc; - } catch ( Exception e ) + } + catch ( Exception e ) { e.printStackTrace(); } } } } - + Message preProcessMessage( SpyMessage message ) - throws JMSException + throws JMSException { message.session = session; // Has the message expired? if ( message.isOutdated() ) { - if( log.isTraceEnabled() ) - log.trace( "SessionQueue: I dropped a message (timeout)" ); + log.debug( "I dropped a message (timeout)" ); message.doAcknowledge(); return null; } @@ -602,7 +662,8 @@ return ( ( SpyEncapsulatedMessage )message ).getMessage(); } return message; - } else + } + else { return message; } 1.2.2.2 +90 -49 jbossmq/src/main/org/jboss/mq/SpyXAResource.java Index: SpyXAResource.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResource.java,v retrieving revision 1.2.2.1 retrieving revision 1.2.2.2 diff -u -r1.2.2.1 -r1.2.2.2 --- SpyXAResource.java 2001/08/23 03:57:08 1.2.2.1 +++ SpyXAResource.java 2001/12/14 03:19:12 1.2.2.2 @@ -16,24 +16,26 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.2.2.1 $ + * @version $Revision: 1.2.2.2 $ */ -public class SpyXAResource implements XAResource { - +public class SpyXAResource implements XAResource +{ + ////////////////////////////////////////////////////////////////// // Attributes ////////////////////////////////////////////////////////////////// - + SpySession session; - + ////////////////////////////////////////////////////////////////// // Constructors ////////////////////////////////////////////////////////////////// - - SpyXAResource( SpySession session ) { + + SpyXAResource( SpySession session ) + { this.session = session; } - + /** * setTransactionTimeout method comment. * @@ -44,10 +46,11 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public boolean setTransactionTimeout( int arg1 ) - throws javax.transaction.xa.XAException { + throws javax.transaction.xa.XAException + { return false; } - + /** * getTransactionTimeout method comment. * @@ -55,10 +58,11 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public int getTransactionTimeout() - throws javax.transaction.xa.XAException { + throws javax.transaction.xa.XAException + { return 0; } - + /** * isSameRM method comment. * @@ -67,17 +71,19 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public boolean isSameRM( javax.transaction.xa.XAResource arg1 ) - throws javax.transaction.xa.XAException { - if ( !( arg1 instanceof SpyXAResource ) ) { + throws javax.transaction.xa.XAException + { + if ( !( arg1 instanceof SpyXAResource ) ) + { return false; } return ( ( SpyXAResource )arg1 ).session.connection.spyXAResourceManager == session.connection.spyXAResourceManager; } - + ////////////////////////////////////////////////////////////////// // Public Methods ////////////////////////////////////////////////////////////////// - + /** * commit method comment. * @@ -86,14 +92,17 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public void commit( javax.transaction.xa.Xid xid, boolean onePhase ) - throws javax.transaction.xa.XAException { - try { + throws javax.transaction.xa.XAException + { + try + { session.connection.spyXAResourceManager.commit( xid, onePhase ); - } catch ( JMSException e ) { + } catch ( JMSException e ) + { throw new XAException( XAException.XAER_RMERR ); } } - + /** * end method comment. * @@ -102,14 +111,18 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public void end( javax.transaction.xa.Xid xid, int flags ) - throws javax.transaction.xa.XAException { - if ( session.currentTransactionId == null ) { + throws javax.transaction.xa.XAException + { + if ( session.currentTransactionId == null ) + { throw new XAException( XAException.XAER_OUTSIDE ); } - - synchronized ( session.runLock ) { - - switch ( flags ) { + + synchronized ( session.runLock ) + { + + switch ( flags ) + { case TMSUSPEND: session.currentTransactionId = null; session.connection.spyXAResourceManager.suspendTx( xid ); @@ -125,7 +138,7 @@ } } } - + /** * forget method comment. * @@ -133,9 +146,10 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public void forget( javax.transaction.xa.Xid arg1 ) - throws javax.transaction.xa.XAException { + throws javax.transaction.xa.XAException + { } - + /** * prepare method comment. * @@ -145,14 +159,17 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public int prepare( javax.transaction.xa.Xid xid ) - throws javax.transaction.xa.XAException { - try { + throws javax.transaction.xa.XAException + { + try + { return session.connection.spyXAResourceManager.prepare( xid ); - } catch ( JMSException e ) { + } catch ( JMSException e ) + { throw new XAException( XAException.XAER_RMERR ); } } - + /** * recover method comment. * @@ -162,10 +179,11 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public Xid[] recover( int arg1 ) - throws javax.transaction.xa.XAException { + throws javax.transaction.xa.XAException + { return new Xid[0]; } - + /** * rollback method comment. * @@ -173,14 +191,17 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public void rollback( javax.transaction.xa.Xid xid ) - throws javax.transaction.xa.XAException { - try { + throws javax.transaction.xa.XAException + { + try + { session.connection.spyXAResourceManager.rollback( xid ); - } catch ( JMSException e ) { + } catch ( JMSException e ) + { throw new XAException( XAException.XAER_RMERR ); } } - + /** * start method comment. * @@ -189,16 +210,36 @@ * @exception javax.transaction.xa.XAException Description of Exception */ public void start( javax.transaction.xa.Xid xid, int flags ) - throws javax.transaction.xa.XAException { - if ( session.currentTransactionId != null ) { - throw new XAException( XAException.XAER_OUTSIDE ); + throws javax.transaction.xa.XAException + { + + boolean convertTx=false; + if ( session.currentTransactionId != null ) + { + if( flags==TMNOFLAGS && session.currentTransactionId instanceof Long ) + { + convertTx=true; + } else + { + throw new XAException( XAException.XAER_OUTSIDE ); + } } - - synchronized ( session.runLock ) { - - switch ( flags ) { + + synchronized ( session.runLock ) + { + + switch ( flags ) + { case TMNOFLAGS: - session.currentTransactionId = session.connection.spyXAResourceManager.startTx( xid ); + if( convertTx ) + { + // it was an anonymous TX, TM is now taking control over it. + // convert it over to a normal XID tansaction. + session.currentTransactionId = session.connection.spyXAResourceManager.convertTx( (Long)session.currentTransactionId, xid ); + } else + { + session.currentTransactionId = session.connection.spyXAResourceManager.startTx( xid ); + } break; case TMJOIN: session.currentTransactionId = session.connection.spyXAResourceManager.joinTx( xid ); @@ -208,8 +249,8 @@ break; } session.runLock.notify(); - + } - + } } 1.2.2.2 +110 -63 jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java Index: SpyXAResourceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java,v retrieving revision 1.2.2.1 retrieving revision 1.2.2.2 diff -u -r1.2.2.1 -r1.2.2.2 --- SpyXAResourceManager.java 2001/08/23 03:57:08 1.2.2.1 +++ SpyXAResourceManager.java 2001/12/14 03:19:12 1.2.2.2 @@ -20,88 +20,96 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.2.2.1 $ + * @version $Revision: 1.2.2.2 $ */ -public class SpyXAResourceManager implements java.io.Serializable { - +public class SpyXAResourceManager implements java.io.Serializable +{ + ////////////////////////////////////////////////////////////////// // Attributes ////////////////////////////////////////////////////////////////// - Connection connection; Map transactions = java.util.Collections.synchronizedMap( new HashMap() ); long nextInternalXid = Long.MIN_VALUE; - + //Valid tx states: private final static byte TX_OPEN = 0; private final static byte TX_SUSPENDED = 2; private final static byte TX_PREPARED = 3; private final static byte TX_COMMITED = 4; private final static byte TX_ROLLEDBACK = 5; - + private final static byte TX_ENDED = 1; - + ////////////////////////////////////////////////////////////////// // Constructors ////////////////////////////////////////////////////////////////// - - public SpyXAResourceManager( Connection conn ) { + + public SpyXAResourceManager( Connection conn ) + { super(); connection = conn; } - - + + ////////////////////////////////////////////////////////////////// // Public Methods ////////////////////////////////////////////////////////////////// - + public void ackMessage( Object xid, SpyMessage msg ) - throws JMSException { + throws JMSException + { TXState state = ( TXState )transactions.get( xid ); - if ( state == null ) { + if ( state == null ) + { throw new JMSException( "Invalid transaction id." ); } - AcknowledgementRequest item = new AcknowledgementRequest(); - item.destination = msg.getJMSDestination(); - item.messageID = msg.getJMSMessageID(); - item.isAck = true; - item.subscriberId = msg.routeToSubscriber; + AcknowledgementRequest item = msg.getAcknowledgementRequest(true); state.ackedMessages.addLast( item ); } - + public void addMessage( Object xid, SpyMessage msg ) - throws JMSException { + throws JMSException + { TXState state = ( TXState )transactions.get( xid ); - if ( state == null ) { + if ( state == null ) + { throw new JMSException( "Invalid transaction id." ); } state.sentMessages.addLast( msg ); } - + public void commit( Object xid, boolean onePhase ) - throws XAException, JMSException { + throws XAException, JMSException + { TXState state = ( TXState )transactions.remove( xid ); - if ( state == null ) { + if ( state == null ) + { throw new XAException( XAException.XAER_NOTA ); } - - if ( onePhase ) { + + if ( onePhase ) + { TransactionRequest transaction = new TransactionRequest(); transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST; transaction.xid = null; - if ( state.sentMessages.size() != 0 ) { + if ( state.sentMessages.size() != 0 ) + { SpyMessage job[] = new SpyMessage[state.sentMessages.size()]; job = ( SpyMessage[] )state.sentMessages.toArray( job ); transaction.messages = job; } - if ( state.ackedMessages.size() != 0 ) { + if ( state.ackedMessages.size() != 0 ) + { AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()]; job = ( AcknowledgementRequest[] )state.ackedMessages.toArray( job ); transaction.acks = job; } connection.send( transaction ); - } else { - if ( state.txState != TX_PREPARED ) { + } else + { + if ( state.txState != TX_PREPARED ) + { throw new XAException( "The transaction had not been prepared" ); } TransactionRequest transaction = new TransactionRequest(); @@ -111,39 +119,47 @@ } state.txState = TX_COMMITED; } - + public void endTx( Object xid, boolean success ) - throws XAException { + throws XAException + { TXState state = ( TXState )transactions.get( xid ); - if ( state == null ) { + if ( state == null ) + { throw new XAException( XAException.XAER_NOTA ); } state.txState = TX_ENDED; } - + public Object joinTx( Xid xid ) - throws XAException { - if ( !transactions.containsKey( xid ) ) { + throws XAException + { + if ( !transactions.containsKey( xid ) ) + { throw new XAException( XAException.XAER_NOTA ); } return xid; } - + public int prepare( Object xid ) - throws XAException, JMSException { + throws XAException, JMSException + { TXState state = ( TXState )transactions.get( xid ); - if ( state == null ) { + if ( state == null ) + { throw new XAException( XAException.XAER_NOTA ); } TransactionRequest transaction = new TransactionRequest(); transaction.requestType = transaction.TWO_PHASE_COMMIT_PREPARE_REQUEST; transaction.xid = xid; - if ( state.sentMessages.size() != 0 ) { + if ( state.sentMessages.size() != 0 ) + { SpyMessage job[] = new SpyMessage[state.sentMessages.size()]; job = ( SpyMessage[] )state.sentMessages.toArray( job ); transaction.messages = job; } - if ( state.ackedMessages.size() != 0 ) { + if ( state.ackedMessages.size() != 0 ) + { AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()]; job = ( AcknowledgementRequest[] )state.ackedMessages.toArray( job ); transaction.acks = job; @@ -152,36 +168,45 @@ state.txState = TX_PREPARED; return javax.transaction.xa.XAResource.XA_OK; } - + public Object resumeTx( Xid xid ) - throws XAException { - if ( !transactions.containsKey( xid ) ) { + throws XAException + { + if ( !transactions.containsKey( xid ) ) + { throw new XAException( XAException.XAER_NOTA ); } return xid; } - + public void rollback( Object xid ) - throws XAException, JMSException { + throws XAException, JMSException + { + TXState state = ( TXState )transactions.remove( xid ); - if ( state == null ) { + if ( state == null ) + { throw new XAException( XAException.XAER_NOTA ); } - if ( state.txState != TX_PREPARED ) { + if ( state.txState != TX_PREPARED ) + { TransactionRequest transaction = new TransactionRequest(); transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST; transaction.xid = null; - if ( state.ackedMessages.size() != 0 ) { + if ( state.ackedMessages.size() != 0 ) + { AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()]; job = ( AcknowledgementRequest[] )state.ackedMessages.toArray( job ); transaction.acks = job; //Neg Acknowlege all consumed messages - for ( int i = 0; i < transaction.acks.length; i++ ) { + for ( int i = 0; i < transaction.acks.length; i++ ) + { transaction.acks[i].isAck = false; } } connection.send( transaction ); - } else { + } else + { TransactionRequest transaction = new TransactionRequest(); transaction.xid = xid; transaction.requestType = transaction.TWO_PHASE_COMMIT_ROLLBACK_REQUEST; @@ -189,38 +214,60 @@ } state.txState = TX_ROLLEDBACK; } - - public synchronized Object startTx() { + + public synchronized Object startTx() + { Long newXid = new Long( nextInternalXid++ ); transactions.put( newXid, new TXState() ); return newXid; } - + public Object startTx( Xid xid ) - throws XAException { - if ( transactions.containsKey( xid ) ) { + throws XAException + { + if ( transactions.containsKey( xid ) ) + { throw new XAException( XAException.XAER_DUPID ); } transactions.put( xid, new TXState() ); return xid; } - + public Object suspendTx( Xid xid ) - throws XAException { - if ( !transactions.containsKey( xid ) ) { + throws XAException + { + if ( !transactions.containsKey( xid ) ) + { throw new XAException( XAException.XAER_NOTA ); } return xid; } - + + public Object convertTx( Long anonXid, Xid xid ) throws XAException + { + + if ( !transactions.containsKey( anonXid ) ) + { + throw new XAException( XAException.XAER_NOTA ); + } + if ( transactions.containsKey( xid ) ) + { + throw new XAException( XAException.XAER_DUPID ); + } + TXState s = (TXState)transactions.remove( anonXid ); + transactions.put( xid, s ); + return xid; + } + ////////////////////////////////////////////////////////////////// // Helper Inner classes ////////////////////////////////////////////////////////////////// - + /** * @created August 16, 2001 */ - class TXState { + class TXState + { byte txState = TX_OPEN; LinkedList sentMessages = new LinkedList(); LinkedList ackedMessages = new LinkedList();
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development