User: starksm 
  Date: 01/12/13 19:19:13

  Modified:    src/main/org/jboss/mq Tag: Branch_2_4
                        SpyMessageConsumer.java SpyXAResource.java
                        SpyXAResourceManager.java
  Log:
  Merge changes from main to improve the ASF behavior and update the logging.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.7.2.6   +97 -36    jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v
  retrieving revision 1.7.2.5
  retrieving revision 1.7.2.6
  diff -u -r1.7.2.5 -r1.7.2.6
  --- SpyMessageConsumer.java   2001/12/13 21:29:21     1.7.2.5
  +++ SpyMessageConsumer.java   2001/12/14 03:19:12     1.7.2.6
  @@ -11,28 +11,26 @@
   import javax.jms.Destination;
   import javax.jms.JMSException;
   import javax.jms.Message;
  -
   import javax.jms.MessageConsumer;
   import javax.jms.MessageListener;
   import javax.jms.Session;
   
   import org.jboss.logging.Logger;
  -import org.jboss.mq.selectors.Selector;
   
   /**
  - *  This class implements javax.jms.MessageConsumer
  + * This class implements <tt>javax.jms.MessageConsumer</tt>.
    *
    * @author     Norbert Lataille ([EMAIL PROTECTED])
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @author     David Maplesden ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.7.2.5 $
  + * @version    $Revision: 1.7.2.6 $
    */
   public class SpyMessageConsumer
      implements MessageConsumer, SpyConsumer, Runnable
   {
      static Logger log = Logger.getLogger( SpyMessageConsumer.class );
  -
  +   
      //Link to my session
      public SpySession session;
      // The subscription structure should be fill out by the decendent
  @@ -100,7 +98,7 @@
      // Public --------------------------------------------------------
      
      public String getMessageSelector()
  -   throws JMSException
  +      throws JMSException
      {
         if ( closed )
         {
  @@ -111,7 +109,7 @@
      }
      
      public MessageListener getMessageListener()
  -   throws JMSException
  +      throws JMSException
      {
         if ( closed )
         {
  @@ -150,7 +148,6 @@
         
         synchronized ( messages )
         {
  -         
            SpyMessage msg = session.connection.receive( subscription, 0 );
            if ( msg != null )
            {
  @@ -180,15 +177,17 @@
                     return mes;
                  }
                  if( log.isTraceEnabled() )
  -                  log.trace( "SpyMessageConsumer: receive in messages.wait()" );
  +                  log.trace("receive in messages.wait()");
                  messages.wait();
               }
  -         } catch ( InterruptedException e )
  +         }
  +         catch ( InterruptedException e )
            {
               JMSException newE = new SpyJMSException( "Receive interupted" );
               newE.setLinkedException( e );
               throw newE;
  -         } finally
  +         }
  +         finally
            {
               waitingForMessage = false;
               synchronized ( stateLock )
  @@ -201,7 +200,7 @@
      }
      
      public Message receive( long timeOut )
  -   throws JMSException
  +      throws JMSException
      {
         if ( timeOut == 0 )
         {
  @@ -230,7 +229,6 @@
         
         synchronized ( messages )
         {
  -         
            SpyMessage msg = session.connection.receive( subscription, timeOut );
            if ( msg != null )
            {
  @@ -270,7 +268,8 @@
                  messages.wait( att );
               }
               
  -         } catch ( InterruptedException e )
  +         }
  +         catch ( InterruptedException e )
            {
               JMSException newE = new SpyJMSException( "Receive interupted" );
               newE.setLinkedException( e );
  @@ -288,7 +287,7 @@
      }
      
      public Message receiveNoWait()
  -   throws JMSException
  +      throws JMSException
      {
         if ( closed )
         {
  @@ -321,9 +320,11 @@
      }
      
      public void close()
  -   throws JMSException
  +      throws JMSException
      {
         
  +      log.debug("Message consumer closing.");
  +      
         synchronized ( messages )
         {
            if ( closed )
  @@ -335,6 +336,16 @@
            messages.notify();
         }
         
  +      if ( listenerThread != null && !Thread.currentThread().equals(listenerThread) 
)
  +      {
  +         try
  +         {
  +            listenerThread.join();
  +         }
  +         catch(InterruptedException e)
  +         { }
  +      }
  +      
         if ( !sessionConsumer )
         {
            session.removeConsumer( this );
  @@ -355,28 +366,33 @@
            }
            
            //Add a message to the queue
  +         
  +         //  Consider removing this test (subscription.accepts).  I don't think it 
will ever fail
  +         //  because the test is also done by the server before message is even 
sent.
            if ( subscription.accepts( message ) )
            {
               if ( sessionConsumer )
               {
                  sessionConsumerProcessMessage( message );
  -            } else
  +            }
  +            else
               {
                  if ( waitingForMessage )
                  {
                     messages.addLast( message );
                     messages.notifyAll();
  -               } else
  +               }
  +               else
                  {
                     //unwanted message (due to consumer receive timing out) Nack it.
  +                  log.debug( "WARNING: NACK issued. The message consumer was not 
waiting for a message." );
                     session.connection.send( message.getAcknowledgementRequest( false 
) );
                  }
               }
            }
            else
            {
  -            if( log.isTraceEnabled() )
  -               log.trace( "WARNING: NACK issued. The subscription did not accept 
the message" );
  +            log.debug( "WARNING: NACK issued. The subscription did not accept the 
message" );
               session.connection.send( message.getAcknowledgementRequest( false ) );
            }
         }
  @@ -388,6 +404,7 @@
         SpyMessage mes = null;
         try
         {
  +         boolean trace = log.isTraceEnabled();
            outer :
               while ( true )
               {
  @@ -423,10 +440,15 @@
                           waitingForMessage = false;
                        }
                     }
  +
  +                  if( trace )
  +                     log.trace("Recevied msg: "+mes);
                     mes.session = session;
                     if ( mes.isOutdated() )
                     {
                        //Drop message (it has expired)
  +                     if( trace )
  +                        log.trace("Droping expired msg: "+mes);
                        mes.doAcknowledge();
                        mes = null;
                     }
  @@ -503,13 +525,12 @@
      }
      
      protected void sessionConsumerProcessMessage( SpyMessage message )
  -   throws JMSException
  +      throws JMSException
      {
         message.session = session;
         if ( message.isOutdated() )
         {
  -         if( log.isTraceEnabled() )
  -            log.trace( "SessionQueue: I dropped a message (timeout)" );
  +         log.debug( "I dropped a message (timeout)" );
            message.doAcknowledge();
            return;
         }
  @@ -519,6 +540,21 @@
         {
            thisListener = messageListener;
         }
  +      
  +      // Add the message to XAResource manager before we call onMessages since the
  +      // resource may get elisted IN the onMessage method.  This gives onMessage a 
chance to roll the message back.
  +      Object anonymousTXID=null;
  +      if ( session.transacted )
  +      {
  +         // Only happens with XA transactions
  +         if( session.currentTransactionId == null )
  +         {
  +            anonymousTXID = session.connection.spyXAResourceManager.startTx();
  +            session.currentTransactionId = anonymousTXID;
  +         }
  +         session.connection.spyXAResourceManager.ackMessage( 
session.currentTransactionId, message );
  +      }
  +      
         if ( thisListener != null )
         {
            Message mes = message;
  @@ -528,21 +564,45 @@
            }
            thisListener.onMessage( mes );
         }
  -      //ack
  -      if ( session.transacted )
  +      
  +      if (session.transacted)
         {
  -         session.connection.spyXAResourceManager.ackMessage( 
session.currentTransactionId, message );
  -      } else if ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE )
  +         // If we started an anonymous tx
  +         if (anonymousTXID != null)
  +         {
  +            if (session.currentTransactionId == anonymousTXID)
  +            {
  +               // This is bad..  We are an XA controled TX but no TM ever elisted 
us.
  +               // rollback the work and spit an error
  +               try
  +               {
  +                  session.connection.spyXAResourceManager.endTx(anonymousTXID, 
true);
  +                  session.connection.spyXAResourceManager.rollback(anonymousTXID);
  +               } catch (javax.transaction.xa.XAException e)
  +               {
  +                  log.error("Could not rollback", e);
  +               } finally
  +               {
  +                  session.currentTransactionId = null;
  +               }
  +               throw new SpyJMSException("Messaged delivery was not controled by a 
Transaction Manager");
  +            }
  +         }
  +      }
  +      else
         {
  -         message.doAcknowledge();
  +         // Should we Auto-ack the message since the message has now been 
processesed
  +         if (session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE)
  +         {
  +            message.doAcknowledge();
  +         }
         }
      }
      
      Message getMessage()
      {
         synchronized ( messages )
  -      {
  -         
  +      {   
            while ( true )
            {
               
  @@ -563,24 +623,24 @@
                  }
                  
                  return rc;
  -            } catch ( Exception e )
  +            }
  +            catch ( Exception e )
               {
                  e.printStackTrace();
               }
            }
         }
      }
  -   
  +
      Message preProcessMessage( SpyMessage message )
  -   throws JMSException
  +      throws JMSException
      {
         message.session = session;
         
         // Has the message expired?
         if ( message.isOutdated() )
         {
  -         if( log.isTraceEnabled() )
  -            log.trace( "SessionQueue: I dropped a message (timeout)" );
  +         log.debug( "I dropped a message (timeout)" );
            message.doAcknowledge();
            return null;
         }
  @@ -602,7 +662,8 @@
               return ( ( SpyEncapsulatedMessage )message ).getMessage();
            }
            return message;
  -      } else
  +      }
  +      else
         {
            return message;
         }
  
  
  
  1.2.2.2   +90 -49    jbossmq/src/main/org/jboss/mq/SpyXAResource.java
  
  Index: SpyXAResource.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResource.java,v
  retrieving revision 1.2.2.1
  retrieving revision 1.2.2.2
  diff -u -r1.2.2.1 -r1.2.2.2
  --- SpyXAResource.java        2001/08/23 03:57:08     1.2.2.1
  +++ SpyXAResource.java        2001/12/14 03:19:12     1.2.2.2
  @@ -16,24 +16,26 @@
    *
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.2.2.1 $
  + * @version    $Revision: 1.2.2.2 $
    */
  -public class SpyXAResource implements XAResource {
  -
  +public class SpyXAResource implements XAResource
  +{
  +   
      //////////////////////////////////////////////////////////////////
      // Attributes
      //////////////////////////////////////////////////////////////////
  -
  +   
      SpySession       session;
  -
  +   
      //////////////////////////////////////////////////////////////////
      // Constructors
      //////////////////////////////////////////////////////////////////
  -
  -   SpyXAResource( SpySession session ) {
  +   
  +   SpyXAResource( SpySession session )
  +   {
         this.session = session;
      }
  -
  +   
      /**
       *  setTransactionTimeout method comment.
       *
  @@ -44,10 +46,11 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public boolean setTransactionTimeout( int arg1 )
  -      throws javax.transaction.xa.XAException {
  +   throws javax.transaction.xa.XAException
  +   {
         return false;
      }
  -
  +   
      /**
       *  getTransactionTimeout method comment.
       *
  @@ -55,10 +58,11 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public int getTransactionTimeout()
  -      throws javax.transaction.xa.XAException {
  +   throws javax.transaction.xa.XAException
  +   {
         return 0;
      }
  -
  +   
      /**
       *  isSameRM method comment.
       *
  @@ -67,17 +71,19 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public boolean isSameRM( javax.transaction.xa.XAResource arg1 )
  -      throws javax.transaction.xa.XAException {
  -      if ( !( arg1 instanceof SpyXAResource ) ) {
  +   throws javax.transaction.xa.XAException
  +   {
  +      if ( !( arg1 instanceof SpyXAResource ) )
  +      {
            return false;
         }
         return ( ( SpyXAResource )arg1 ).session.connection.spyXAResourceManager == 
session.connection.spyXAResourceManager;
      }
  -
  +   
      //////////////////////////////////////////////////////////////////
      // Public Methods
      //////////////////////////////////////////////////////////////////
  -
  +   
      /**
       *  commit method comment.
       *
  @@ -86,14 +92,17 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public void commit( javax.transaction.xa.Xid xid, boolean onePhase )
  -      throws javax.transaction.xa.XAException {
  -      try {
  +   throws javax.transaction.xa.XAException
  +   {
  +      try
  +      {
            session.connection.spyXAResourceManager.commit( xid, onePhase );
  -      } catch ( JMSException e ) {
  +      } catch ( JMSException e )
  +      {
            throw new XAException( XAException.XAER_RMERR );
         }
      }
  -
  +   
      /**
       *  end method comment.
       *
  @@ -102,14 +111,18 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public void end( javax.transaction.xa.Xid xid, int flags )
  -      throws javax.transaction.xa.XAException {
  -      if ( session.currentTransactionId == null ) {
  +   throws javax.transaction.xa.XAException
  +   {
  +      if ( session.currentTransactionId == null )
  +      {
            throw new XAException( XAException.XAER_OUTSIDE );
         }
  -
  -      synchronized ( session.runLock ) {
  -
  -         switch ( flags ) {
  +      
  +      synchronized ( session.runLock )
  +      {
  +         
  +         switch ( flags )
  +         {
               case TMSUSPEND:
                  session.currentTransactionId = null;
                  session.connection.spyXAResourceManager.suspendTx( xid );
  @@ -125,7 +138,7 @@
            }
         }
      }
  -
  +   
      /**
       *  forget method comment.
       *
  @@ -133,9 +146,10 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public void forget( javax.transaction.xa.Xid arg1 )
  -      throws javax.transaction.xa.XAException {
  +   throws javax.transaction.xa.XAException
  +   {
      }
  -
  +   
      /**
       *  prepare method comment.
       *
  @@ -145,14 +159,17 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public int prepare( javax.transaction.xa.Xid xid )
  -      throws javax.transaction.xa.XAException {
  -      try {
  +   throws javax.transaction.xa.XAException
  +   {
  +      try
  +      {
            return session.connection.spyXAResourceManager.prepare( xid );
  -      } catch ( JMSException e ) {
  +      } catch ( JMSException e )
  +      {
            throw new XAException( XAException.XAER_RMERR );
         }
      }
  -
  +   
      /**
       *  recover method comment.
       *
  @@ -162,10 +179,11 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public Xid[] recover( int arg1 )
  -      throws javax.transaction.xa.XAException {
  +   throws javax.transaction.xa.XAException
  +   {
         return new Xid[0];
      }
  -
  +   
      /**
       *  rollback method comment.
       *
  @@ -173,14 +191,17 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public void rollback( javax.transaction.xa.Xid xid )
  -      throws javax.transaction.xa.XAException {
  -      try {
  +   throws javax.transaction.xa.XAException
  +   {
  +      try
  +      {
            session.connection.spyXAResourceManager.rollback( xid );
  -      } catch ( JMSException e ) {
  +      } catch ( JMSException e )
  +      {
            throw new XAException( XAException.XAER_RMERR );
         }
      }
  -
  +   
      /**
       *  start method comment.
       *
  @@ -189,16 +210,36 @@
       * @exception  javax.transaction.xa.XAException  Description of Exception
       */
      public void start( javax.transaction.xa.Xid xid, int flags )
  -      throws javax.transaction.xa.XAException {
  -      if ( session.currentTransactionId != null ) {
  -         throw new XAException( XAException.XAER_OUTSIDE );
  +   throws javax.transaction.xa.XAException
  +   {
  +      
  +      boolean convertTx=false;
  +      if ( session.currentTransactionId != null )
  +      {
  +         if( flags==TMNOFLAGS && session.currentTransactionId instanceof Long )
  +         {
  +            convertTx=true;
  +         } else
  +         {
  +            throw new XAException( XAException.XAER_OUTSIDE );
  +         }
         }
  -
  -      synchronized ( session.runLock ) {
  -
  -         switch ( flags ) {
  +      
  +      synchronized ( session.runLock )
  +      {
  +         
  +         switch ( flags )
  +         {
               case TMNOFLAGS:
  -               session.currentTransactionId = 
session.connection.spyXAResourceManager.startTx( xid );
  +               if( convertTx )
  +               {
  +                  // it was an anonymous TX, TM is now taking control over it.
  +                  // convert it over to a normal XID tansaction.
  +                  session.currentTransactionId = 
session.connection.spyXAResourceManager.convertTx( (Long)session.currentTransactionId, 
xid );
  +               } else
  +               {
  +                  session.currentTransactionId = 
session.connection.spyXAResourceManager.startTx( xid );
  +               }
                  break;
               case TMJOIN:
                  session.currentTransactionId = 
session.connection.spyXAResourceManager.joinTx( xid );
  @@ -208,8 +249,8 @@
                  break;
            }
            session.runLock.notify();
  -
  +         
         }
  -
  +      
      }
   }
  
  
  
  1.2.2.2   +110 -63   jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java
  
  Index: SpyXAResourceManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java,v
  retrieving revision 1.2.2.1
  retrieving revision 1.2.2.2
  diff -u -r1.2.2.1 -r1.2.2.2
  --- SpyXAResourceManager.java 2001/08/23 03:57:08     1.2.2.1
  +++ SpyXAResourceManager.java 2001/12/14 03:19:12     1.2.2.2
  @@ -20,88 +20,96 @@
    *
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.2.2.1 $
  + * @version    $Revision: 1.2.2.2 $
    */
  -public class SpyXAResourceManager implements java.io.Serializable {
  -
  +public class SpyXAResourceManager implements java.io.Serializable
  +{
  +   
      //////////////////////////////////////////////////////////////////
      // Attributes
      //////////////////////////////////////////////////////////////////
  -
      Connection       connection;
      Map              transactions = java.util.Collections.synchronizedMap( new 
HashMap() );
      long             nextInternalXid = Long.MIN_VALUE;
  -
  +   
      //Valid tx states:
      private final static byte TX_OPEN = 0;
      private final static byte TX_SUSPENDED = 2;
      private final static byte TX_PREPARED = 3;
      private final static byte TX_COMMITED = 4;
      private final static byte TX_ROLLEDBACK = 5;
  -
  +   
      private final static byte TX_ENDED = 1;
  -
  +   
      //////////////////////////////////////////////////////////////////
      // Constructors
      //////////////////////////////////////////////////////////////////
  -
  -   public SpyXAResourceManager( Connection conn ) {
  +   
  +   public SpyXAResourceManager( Connection conn )
  +   {
         super();
         connection = conn;
      }
  -
  -
  +   
  +   
      //////////////////////////////////////////////////////////////////
      // Public Methods
      //////////////////////////////////////////////////////////////////
  -
  +   
      public void ackMessage( Object xid, SpyMessage msg )
  -      throws JMSException {
  +      throws JMSException
  +   {
         TXState state = ( TXState )transactions.get( xid );
  -      if ( state == null ) {
  +      if ( state == null )
  +      {
            throw new JMSException( "Invalid transaction id." );
         }
  -      AcknowledgementRequest item = new AcknowledgementRequest();
  -      item.destination = msg.getJMSDestination();
  -      item.messageID = msg.getJMSMessageID();
  -      item.isAck = true;
  -      item.subscriberId = msg.routeToSubscriber;
  +      AcknowledgementRequest item = msg.getAcknowledgementRequest(true);
         state.ackedMessages.addLast( item );
      }
  -
  +   
      public void addMessage( Object xid, SpyMessage msg )
  -      throws JMSException {
  +      throws JMSException
  +   {
         TXState state = ( TXState )transactions.get( xid );
  -      if ( state == null ) {
  +      if ( state == null )
  +      {
            throw new JMSException( "Invalid transaction id." );
         }
         state.sentMessages.addLast( msg );
      }
  -
  +   
      public void commit( Object xid, boolean onePhase )
  -      throws XAException, JMSException {
  +      throws XAException, JMSException
  +   {
         TXState state = ( TXState )transactions.remove( xid );
  -      if ( state == null ) {
  +      if ( state == null )
  +      {
            throw new XAException( XAException.XAER_NOTA );
         }
  -
  -      if ( onePhase ) {
  +      
  +      if ( onePhase )
  +      {
            TransactionRequest transaction = new TransactionRequest();
            transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST;
            transaction.xid = null;
  -         if ( state.sentMessages.size() != 0 ) {
  +         if ( state.sentMessages.size() != 0 )
  +         {
               SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
               job = ( SpyMessage[] )state.sentMessages.toArray( job );
               transaction.messages = job;
            }
  -         if ( state.ackedMessages.size() != 0 ) {
  +         if ( state.ackedMessages.size() != 0 )
  +         {
               AcknowledgementRequest job[] = new 
AcknowledgementRequest[state.ackedMessages.size()];
               job = ( AcknowledgementRequest[] )state.ackedMessages.toArray( job );
               transaction.acks = job;
            }
            connection.send( transaction );
  -      } else {
  -         if ( state.txState != TX_PREPARED ) {
  +      } else
  +      {
  +         if ( state.txState != TX_PREPARED )
  +         {
               throw new XAException( "The transaction had not been prepared" );
            }
            TransactionRequest transaction = new TransactionRequest();
  @@ -111,39 +119,47 @@
         }
         state.txState = TX_COMMITED;
      }
  -
  +   
      public void endTx( Object xid, boolean success )
  -      throws XAException {
  +      throws XAException
  +   {
         TXState state = ( TXState )transactions.get( xid );
  -      if ( state == null ) {
  +      if ( state == null )
  +      {
            throw new XAException( XAException.XAER_NOTA );
         }
         state.txState = TX_ENDED;
      }
  -
  +   
      public Object joinTx( Xid xid )
  -      throws XAException {
  -      if ( !transactions.containsKey( xid ) ) {
  +      throws XAException
  +   {
  +      if ( !transactions.containsKey( xid ) )
  +      {
            throw new XAException( XAException.XAER_NOTA );
         }
         return xid;
      }
  -
  +   
      public int prepare( Object xid )
  -      throws XAException, JMSException {
  +      throws XAException, JMSException
  +   {
         TXState state = ( TXState )transactions.get( xid );
  -      if ( state == null ) {
  +      if ( state == null )
  +      {
            throw new XAException( XAException.XAER_NOTA );
         }
         TransactionRequest transaction = new TransactionRequest();
         transaction.requestType = transaction.TWO_PHASE_COMMIT_PREPARE_REQUEST;
         transaction.xid = xid;
  -      if ( state.sentMessages.size() != 0 ) {
  +      if ( state.sentMessages.size() != 0 )
  +      {
            SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
            job = ( SpyMessage[] )state.sentMessages.toArray( job );
            transaction.messages = job;
         }
  -      if ( state.ackedMessages.size() != 0 ) {
  +      if ( state.ackedMessages.size() != 0 )
  +      {
            AcknowledgementRequest job[] = new 
AcknowledgementRequest[state.ackedMessages.size()];
            job = ( AcknowledgementRequest[] )state.ackedMessages.toArray( job );
            transaction.acks = job;
  @@ -152,36 +168,45 @@
         state.txState = TX_PREPARED;
         return javax.transaction.xa.XAResource.XA_OK;
      }
  -
  +   
      public Object resumeTx( Xid xid )
  -      throws XAException {
  -      if ( !transactions.containsKey( xid ) ) {
  +      throws XAException
  +   {
  +      if ( !transactions.containsKey( xid ) )
  +      {
            throw new XAException( XAException.XAER_NOTA );
         }
         return xid;
      }
  -
  +   
      public void rollback( Object xid )
  -      throws XAException, JMSException {
  +      throws XAException, JMSException
  +   {
  +      
         TXState state = ( TXState )transactions.remove( xid );
  -      if ( state == null ) {
  +      if ( state == null )
  +      {
            throw new XAException( XAException.XAER_NOTA );
         }
  -      if ( state.txState != TX_PREPARED ) {
  +      if ( state.txState != TX_PREPARED )
  +      {
            TransactionRequest transaction = new TransactionRequest();
            transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST;
            transaction.xid = null;
  -         if ( state.ackedMessages.size() != 0 ) {
  +         if ( state.ackedMessages.size() != 0 )
  +         {
               AcknowledgementRequest job[] = new 
AcknowledgementRequest[state.ackedMessages.size()];
               job = ( AcknowledgementRequest[] )state.ackedMessages.toArray( job );
               transaction.acks = job;
               //Neg Acknowlege all consumed messages
  -            for ( int i = 0; i < transaction.acks.length; i++ ) {
  +            for ( int i = 0; i < transaction.acks.length; i++ )
  +            {
                  transaction.acks[i].isAck = false;
               }
            }
            connection.send( transaction );
  -      } else {
  +      } else
  +      {
            TransactionRequest transaction = new TransactionRequest();
            transaction.xid = xid;
            transaction.requestType = transaction.TWO_PHASE_COMMIT_ROLLBACK_REQUEST;
  @@ -189,38 +214,60 @@
         }
         state.txState = TX_ROLLEDBACK;
      }
  -
  -   public synchronized Object startTx() {
  +   
  +   public synchronized Object startTx()
  +   {
         Long newXid = new Long( nextInternalXid++ );
         transactions.put( newXid, new TXState() );
         return newXid;
      }
  -
  +   
      public Object startTx( Xid xid )
  -      throws XAException {
  -      if ( transactions.containsKey( xid ) ) {
  +      throws XAException
  +   {
  +      if ( transactions.containsKey( xid ) )
  +      {
            throw new XAException( XAException.XAER_DUPID );
         }
         transactions.put( xid, new TXState() );
         return xid;
      }
  -
  +   
      public Object suspendTx( Xid xid )
  -      throws XAException {
  -      if ( !transactions.containsKey( xid ) ) {
  +      throws XAException
  +   {
  +      if ( !transactions.containsKey( xid ) )
  +      {
            throw new XAException( XAException.XAER_NOTA );
         }
         return xid;
      }
  -
  +   
  +   public Object convertTx( Long anonXid, Xid xid ) throws XAException
  +   {
  +      
  +      if ( !transactions.containsKey( anonXid ) )
  +      {
  +         throw new XAException( XAException.XAER_NOTA );
  +      }
  +      if ( transactions.containsKey( xid ) )
  +      {
  +         throw new XAException( XAException.XAER_DUPID );
  +      }
  +      TXState s = (TXState)transactions.remove( anonXid );
  +      transactions.put( xid,  s );
  +      return xid;
  +   }
  +   
      //////////////////////////////////////////////////////////////////
      // Helper Inner classes
      //////////////////////////////////////////////////////////////////
  -
  +   
      /**
       * @created    August 16, 2001
       */
  -   class TXState {
  +   class TXState
  +   {
         byte          txState = TX_OPEN;
         LinkedList    sentMessages = new LinkedList();
         LinkedList    ackedMessages = new LinkedList();
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to