User: chirino 
  Date: 01/12/12 14:08:08

  Modified:    src/main/org/jboss/mq SpyMessageConsumer.java
                        SpyXAResource.java SpyXAResourceManager.java
  Log:
  Changed the XA handleing so that it would support using anonymous tx.
  that is, MQ creates the uses a temp tx id on message delivery until the xaresource
  is assigned a txid.  This allows the TX to be associated with TM in the onMessage()
  method of a message listner.
  
  Revision  Changes    Path
  1.14      +31 -6     jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- SpyMessageConsumer.java   2001/12/08 20:21:49     1.13
  +++ SpyMessageConsumer.java   2001/12/12 22:08:07     1.14
  @@ -23,7 +23,7 @@
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @author     David Maplesden ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.13 $
  + * @version    $Revision: 1.14 $
    */
   public class SpyMessageConsumer
      implements MessageConsumer, SpyConsumer, Runnable
  @@ -425,10 +425,18 @@
         synchronized ( stateLock ) {
            thisListener = messageListener;
         }
  +
         // Add the message to XAResource manager before we call onMessages since the 
         // resource may get elisted IN the onMessage method.  This gives onMessage a 
chance to roll the message back.
  -      if ( session.transacted ) 
  +      Object anonymousTXID=null;
  +      if ( session.transacted ) {
  +         // Only happens with XA transactions
  +         if( session.currentTransactionId == null ) {
  +            anonymousTXID = session.connection.spyXAResourceManager.startTx();
  +            session.currentTransactionId = anonymousTXID;
  +         }
            session.connection.spyXAResourceManager.ackMessage( 
session.currentTransactionId, message );
  +      }
   
         if ( thisListener != null ) {
            Message mes = message;
  @@ -438,11 +446,28 @@
            thisListener.onMessage( mes );
         }
         
  -      // Should we Auto-ack the message since the message has now been processesed
  -      if ( !session.transacted ) {
  -         if ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE ) {
  +      if (session.transacted) {
  +         // If we started an anonymous tx
  +         if (anonymousTXID != null) {
  +            if (session.currentTransactionId == anonymousTXID) {
  +               // This is bad..  We are an XA controled TX but no TM ever elisted 
us.
  +               // rollback the work and spit an error 
  +               try {
  +                  session.connection.spyXAResourceManager.endTx(anonymousTXID, 
true);
  +                  session.connection.spyXAResourceManager.rollback(anonymousTXID);
  +               } catch (javax.transaction.xa.XAException e) {
  +                  cat.error("Could not rollback", e);
  +               } finally {
  +                  session.currentTransactionId = null;
  +               }
  +               throw new SpyJMSException("Messaged delivery was not controled by a 
Transaction Manager");               
  +            }
  +         }
  +      } else {
  +         // Should we Auto-ack the message since the message has now been 
processesed
  +         if (session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
               message.doAcknowledge();
  -      }
  +         }
         }
      }
   
  
  
  
  1.3       +15 -3     jbossmq/src/main/org/jboss/mq/SpyXAResource.java
  
  Index: SpyXAResource.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResource.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyXAResource.java        2001/08/17 03:04:01     1.2
  +++ SpyXAResource.java        2001/12/12 22:08:07     1.3
  @@ -16,7 +16,7 @@
    *
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.2 $
  + * @version    $Revision: 1.3 $
    */
   public class SpyXAResource implements XAResource {
   
  @@ -190,15 +190,27 @@
       */
      public void start( javax.transaction.xa.Xid xid, int flags )
         throws javax.transaction.xa.XAException {
  +
  +      boolean convertTx=false;
         if ( session.currentTransactionId != null ) {
  -         throw new XAException( XAException.XAER_OUTSIDE );
  +      if( flags==TMNOFLAGS && session.currentTransactionId instanceof Long ) { 
  +            convertTx=true;
  +         } else {
  +            throw new XAException( XAException.XAER_OUTSIDE );
  +      }
         }
   
         synchronized ( session.runLock ) {
   
            switch ( flags ) {
               case TMNOFLAGS:
  -               session.currentTransactionId = 
session.connection.spyXAResourceManager.startTx( xid );
  +            if( convertTx ) {
  +                  // it was an anonymous TX, TM is now taking control over it.
  +               // convert it over to a normal XID tansaction.
  +               session.currentTransactionId = 
session.connection.spyXAResourceManager.convertTx( (Long)session.currentTransactionId, 
xid );
  +            } else {
  +                  session.currentTransactionId = 
session.connection.spyXAResourceManager.startTx( xid );
  +               }
                  break;
               case TMJOIN:
                  session.currentTransactionId = 
session.connection.spyXAResourceManager.joinTx( xid );
  
  
  
  1.6       +16 -3     jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java
  
  Index: SpyXAResourceManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SpyXAResourceManager.java 2001/11/26 23:13:32     1.5
  +++ SpyXAResourceManager.java 2001/12/12 22:08:07     1.6
  @@ -20,14 +20,13 @@
    *
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.5 $
  + * @version    $Revision: 1.6 $
    */
   public class SpyXAResourceManager implements java.io.Serializable {
   
      //////////////////////////////////////////////////////////////////
      // Attributes
      //////////////////////////////////////////////////////////////////
  -
      Connection       connection;
      Map              transactions = java.util.Collections.synchronizedMap( new 
HashMap() );
      long             nextInternalXid = Long.MIN_VALUE;
  @@ -74,7 +73,7 @@
         state.sentMessages.addLast( msg );
      }
   
  -   public void commit( Object xid, boolean onePhase )
  +   public void commit( Object xid, boolean onePhase ) 
         throws XAException, JMSException {
         TXState state = ( TXState )transactions.remove( xid );
         if ( state == null ) {
  @@ -159,6 +158,7 @@
   
      public void rollback( Object xid )
         throws XAException, JMSException {
  +
         TXState state = ( TXState )transactions.remove( xid );
         if ( state == null ) {
            throw new XAException( XAException.XAER_NOTA );
  @@ -206,6 +206,19 @@
         if ( !transactions.containsKey( xid ) ) {
            throw new XAException( XAException.XAER_NOTA );
         }
  +      return xid;
  +   }
  +
  +   public Object convertTx( Long anonXid, Xid xid ) throws XAException {
  +
  +      if ( !transactions.containsKey( anonXid ) ) {
  +         throw new XAException( XAException.XAER_NOTA );
  +      }
  +      if ( transactions.containsKey( xid ) ) {
  +         throw new XAException( XAException.XAER_DUPID );
  +      }
  +      TXState s = (TXState)transactions.remove( anonXid );
  +      transactions.put( xid,  s );
         return xid;
      }
   
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to