User: chirino Date: 01/12/12 14:08:08 Modified: src/main/org/jboss/mq SpyMessageConsumer.java SpyXAResource.java SpyXAResourceManager.java Log: Changed the XA handleing so that it would support using anonymous tx. that is, MQ creates the uses a temp tx id on message delivery until the xaresource is assigned a txid. This allows the TX to be associated with TM in the onMessage() method of a message listner. Revision Changes Path 1.14 +31 -6 jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java Index: SpyMessageConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v retrieving revision 1.13 retrieving revision 1.14 diff -u -r1.13 -r1.14 --- SpyMessageConsumer.java 2001/12/08 20:21:49 1.13 +++ SpyMessageConsumer.java 2001/12/12 22:08:07 1.14 @@ -23,7 +23,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.13 $ + * @version $Revision: 1.14 $ */ public class SpyMessageConsumer implements MessageConsumer, SpyConsumer, Runnable @@ -425,10 +425,18 @@ synchronized ( stateLock ) { thisListener = messageListener; } + // Add the message to XAResource manager before we call onMessages since the // resource may get elisted IN the onMessage method. This gives onMessage a chance to roll the message back. - if ( session.transacted ) + Object anonymousTXID=null; + if ( session.transacted ) { + // Only happens with XA transactions + if( session.currentTransactionId == null ) { + anonymousTXID = session.connection.spyXAResourceManager.startTx(); + session.currentTransactionId = anonymousTXID; + } session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, message ); + } if ( thisListener != null ) { Message mes = message; @@ -438,11 +446,28 @@ thisListener.onMessage( mes ); } - // Should we Auto-ack the message since the message has now been processesed - if ( !session.transacted ) { - if ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE ) { + if (session.transacted) { + // If we started an anonymous tx + if (anonymousTXID != null) { + if (session.currentTransactionId == anonymousTXID) { + // This is bad.. We are an XA controled TX but no TM ever elisted us. + // rollback the work and spit an error + try { + session.connection.spyXAResourceManager.endTx(anonymousTXID, true); + session.connection.spyXAResourceManager.rollback(anonymousTXID); + } catch (javax.transaction.xa.XAException e) { + cat.error("Could not rollback", e); + } finally { + session.currentTransactionId = null; + } + throw new SpyJMSException("Messaged delivery was not controled by a Transaction Manager"); + } + } + } else { + // Should we Auto-ack the message since the message has now been processesed + if (session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) { message.doAcknowledge(); - } + } } } 1.3 +15 -3 jbossmq/src/main/org/jboss/mq/SpyXAResource.java Index: SpyXAResource.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResource.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- SpyXAResource.java 2001/08/17 03:04:01 1.2 +++ SpyXAResource.java 2001/12/12 22:08:07 1.3 @@ -16,7 +16,7 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class SpyXAResource implements XAResource { @@ -190,15 +190,27 @@ */ public void start( javax.transaction.xa.Xid xid, int flags ) throws javax.transaction.xa.XAException { + + boolean convertTx=false; if ( session.currentTransactionId != null ) { - throw new XAException( XAException.XAER_OUTSIDE ); + if( flags==TMNOFLAGS && session.currentTransactionId instanceof Long ) { + convertTx=true; + } else { + throw new XAException( XAException.XAER_OUTSIDE ); + } } synchronized ( session.runLock ) { switch ( flags ) { case TMNOFLAGS: - session.currentTransactionId = session.connection.spyXAResourceManager.startTx( xid ); + if( convertTx ) { + // it was an anonymous TX, TM is now taking control over it. + // convert it over to a normal XID tansaction. + session.currentTransactionId = session.connection.spyXAResourceManager.convertTx( (Long)session.currentTransactionId, xid ); + } else { + session.currentTransactionId = session.connection.spyXAResourceManager.startTx( xid ); + } break; case TMJOIN: session.currentTransactionId = session.connection.spyXAResourceManager.joinTx( xid ); 1.6 +16 -3 jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java Index: SpyXAResourceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- SpyXAResourceManager.java 2001/11/26 23:13:32 1.5 +++ SpyXAResourceManager.java 2001/12/12 22:08:07 1.6 @@ -20,14 +20,13 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.5 $ + * @version $Revision: 1.6 $ */ public class SpyXAResourceManager implements java.io.Serializable { ////////////////////////////////////////////////////////////////// // Attributes ////////////////////////////////////////////////////////////////// - Connection connection; Map transactions = java.util.Collections.synchronizedMap( new HashMap() ); long nextInternalXid = Long.MIN_VALUE; @@ -74,7 +73,7 @@ state.sentMessages.addLast( msg ); } - public void commit( Object xid, boolean onePhase ) + public void commit( Object xid, boolean onePhase ) throws XAException, JMSException { TXState state = ( TXState )transactions.remove( xid ); if ( state == null ) { @@ -159,6 +158,7 @@ public void rollback( Object xid ) throws XAException, JMSException { + TXState state = ( TXState )transactions.remove( xid ); if ( state == null ) { throw new XAException( XAException.XAER_NOTA ); @@ -206,6 +206,19 @@ if ( !transactions.containsKey( xid ) ) { throw new XAException( XAException.XAER_NOTA ); } + return xid; + } + + public Object convertTx( Long anonXid, Xid xid ) throws XAException { + + if ( !transactions.containsKey( anonXid ) ) { + throw new XAException( XAException.XAER_NOTA ); + } + if ( transactions.containsKey( xid ) ) { + throw new XAException( XAException.XAER_DUPID ); + } + TXState s = (TXState)transactions.remove( anonXid ); + transactions.put( xid, s ); return xid; }
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development