Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
 Fri May 12 12:16:32 2006
@@ -20,6 +20,7 @@
 import javax.xml.namespace.QName;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContextFactory;
@@ -34,6 +35,7 @@
 import org.apache.sandesha2.msgprocessors.MsgProcessor;
 import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.wsrm.Sequence;
@@ -63,42 +65,71 @@
                        log.debug(message);
                        throw new AxisFault(message);
                }
-               
-               // getting rm message
-               RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);
 
-               String DONE = (String) msgCtx
-                               
.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+               String DONE = (String) 
msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
                if (null != DONE && "true".equals(DONE))
                        return;
 
-               
msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-               
-               String dummyMessageString = (String) 
msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
-               boolean dummyMessage = false;
-               if (dummyMessageString!=null && 
Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
-                       dummyMessage = true;
-               
+               
msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context);
 
-               MsgProcessor msgProcessor = null;
-               int messageType = rmMsgCtx.getMessageType();
-               if (messageType==Sandesha2Constants.MessageTypes.UNKNOWN) {
-                       MessageContext requestMsgCtx = 
msgCtx.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
-                       if (requestMsgCtx!=null) {  //for the server side
-                               RMMsgContext reqRMMsgCtx = 
MsgInitializer.initializeMessage(requestMsgCtx);
-                               Sequence sequencePart = (Sequence) 
reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-                               if (sequencePart!=null)
-                                       msgProcessor = new 
ApplicationMsgProcessor ();// a rm intended message.
-                       } else if (!msgCtx.isServerSide()) //if client side.
-                           msgProcessor = new ApplicationMsgProcessor ();
-               }else  {
-                       msgProcessor = 
MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+               boolean withinTransaction = false;
+               String withinTransactionStr = (String) 
msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+               if (withinTransactionStr != null && 
Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+                       withinTransaction = true;
                }
+
+               Transaction transaction = null;
+               if (!withinTransaction) {
+                       transaction = storageManager.getTransaction();
+                       
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_TRUE);
+               }
+               boolean rolebacked = false;
                
-               if (msgProcessor!=null)
-                       msgProcessor.processOutMessage(rmMsgCtx);
-       
+               try {
+                       // getting rm message
+                       RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);
+
+                       String dummyMessageString = (String) 
msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
+                       boolean dummyMessage = false;
+                       if (dummyMessageString != null && 
Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
+                               dummyMessage = true;
+
+                       MsgProcessor msgProcessor = null;
+                       int messageType = rmMsgCtx.getMessageType();
+                       if (messageType == 
Sandesha2Constants.MessageTypes.UNKNOWN) {
+                               MessageContext requestMsgCtx = 
msgCtx.getOperationContext().getMessageContext(
+                                               
OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+                               if (requestMsgCtx != null) { // for the server 
side
+                                       RMMsgContext reqRMMsgCtx = 
MsgInitializer.initializeMessage(requestMsgCtx);
+                                       Sequence sequencePart = (Sequence) 
reqRMMsgCtx
+                                                       
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+                                       if (sequencePart != null)
+                                               msgProcessor = new 
ApplicationMsgProcessor();// a rm
+                                                                               
                                                                // intended
+                                                                               
                                                                // message.
+                               } else if (!msgCtx.isServerSide()) // if client 
side.
+                                       msgProcessor = new 
ApplicationMsgProcessor();
+                       } else {
+                               msgProcessor = 
MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+                       }
+
+                       if (msgProcessor != null)
+                               msgProcessor.processOutMessage(rmMsgCtx);
+
+               } catch (Exception e) {
+                       if (!withinTransaction) {
+                               transaction.rollback();
+                               
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_FALSE);
+                               rolebacked = true;
+                       }
+               } finally {
+                       if (!withinTransaction && !rolebacked) {
+                               transaction.commit();
+                               
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_FALSE);
+                       }
+               }
        }
 
        public QName getName() {

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 Fri May 12 12:16:32 2006
@@ -39,7 +39,6 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -64,7 +63,6 @@
        
        public void processInMessage(RMMsgContext rmMsgCtx) throws 
SandeshaException {
                
-               
                AckRequested ackRequested = (AckRequested) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
                if (ackRequested==null) {
                        throw new SandeshaException ("Message identified as of 
type ackRequested does not have an AckRequeted element");
@@ -79,9 +77,11 @@
                String sequenceID = 
ackRequested.getIdentifier().getIdentifier();
                
                ConfigurationContext configurationContext = 
rmMsgCtx.getMessageContext().getConfigurationContext();
+               
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext);
-               SequencePropertyBeanMgr seqPropMgr = 
storageManager.getSequencePropretyBeanMgr();
                
+               SequencePropertyBeanMgr seqPropMgr = 
storageManager.getSequencePropretyBeanMgr();
+                       
                //Setting the ack depending on AcksTo.
                SequencePropertyBean acksToBean = 
seqPropMgr.retrieve(sequenceID,
                                
Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
@@ -139,8 +139,6 @@
                ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION,
                                
msgContext.getProperty(AddressingConstants.WS_ADDRESSING_VERSION));  //TODO do 
this in the RMMsgCreator
                
-//             RMMsgContext ackRMMsgCtx = rmm
-
                String addressingNamespaceURI = 
SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configurationContext);
                String anonymousURI = 
SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
                
@@ -170,15 +168,15 @@
 
                        rmMsgCtx.getMessageContext().setProperty(
                                        Sandesha2Constants.ACK_WRITTEN, "true");
+                       
                        try {
                                engine.send(ackRMMsgCtx.getMessageContext());
                        } catch (AxisFault e1) {
                                throw new SandeshaException(e1.getMessage());
                        }
+                       
                } else {
 
-                       Transaction asyncAckTransaction = 
storageManager.getTransaction();
-
                        SenderBeanMgr retransmitterBeanMgr = storageManager
                                        .getRetransmitterBeanMgr();
 
@@ -238,8 +236,6 @@
                        //inserting the new ack.
                        retransmitterBeanMgr.insert(ackBean);
 
-                       asyncAckTransaction.commit();
-
                        //passing the message through sandesha2sender
 
                        
ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
@@ -255,6 +251,7 @@
                        } catch (AxisFault e) {
                                throw new SandeshaException (e.getMessage());
                        }
+                       
                        
                        
SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
                        

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 Fri May 12 12:16:32 2006
@@ -31,7 +31,6 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -65,6 +64,8 @@
                        throw new SandeshaException(message);
                }
                
+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
+               
                MessageContext msgCtx = rmMsgCtx.getMessageContext();
                ConfigurationContext configCtx = 
msgCtx.getConfigurationContext();
                
@@ -72,16 +73,12 @@
                sequenceAck.setMustUnderstand(false);
                rmMsgCtx.addSOAPEnvelope();
 
-               StorageManager storageManager = SandeshaUtil
-                               
.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
-                                               .getConfigurationContext());
+
                SenderBeanMgr retransmitterMgr = storageManager
                                .getRetransmitterBeanMgr();
                SequencePropertyBeanMgr seqPropMgr = storageManager
                                .getSequencePropretyBeanMgr();
 
-
-
                Iterator ackRangeIterator = 
sequenceAck.getAcknowledgementRanges()
                                .iterator();
 
@@ -96,6 +93,7 @@
                FaultManager faultManager = new FaultManager();
                RMMsgContext faultMessageContext = 
faultManager.checkForUnknownSequence(rmMsgCtx,outSequenceId);
                if (faultMessageContext != null) {
+                       
                        ConfigurationContext configurationContext = 
msgCtx.getConfigurationContext();
                        AxisEngine engine = new 
AxisEngine(configurationContext);
                        
@@ -105,11 +103,13 @@
                                throw new SandeshaException ("Could not send 
the fault message",e);
                        }
                        
+                       msgCtx.pause();
                        return;
                }
                
                faultMessageContext = 
faultManager.checkForInvalidAcknowledgement(rmMsgCtx);
                if (faultMessageContext != null) {
+                       
                        ConfigurationContext configurationContext = 
msgCtx.getConfigurationContext();
                        AxisEngine engine = new 
AxisEngine(configurationContext);
                        
@@ -119,25 +119,22 @@
                                throw new SandeshaException ("Could not send 
the fault message",e);
                        }
                        
+                       msgCtx.pause();
                        return;
                }
                
         String internalSequenceID = 
SandeshaUtil.getSequenceProperty(outSequenceId,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configCtx);
                
         //updating the last activated time of the sequence.
-               Transaction lastUpdatedTimeTransaction = 
storageManager.getTransaction();
                
SequenceManager.updateLastActivatedTime(internalSequenceID,rmMsgCtx.getMessageContext().getConfigurationContext());
-               lastUpdatedTimeTransaction.commit();
                
-               //Starting transaction
-               Transaction ackTransaction = storageManager.getTransaction();
-
                SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(
                                outSequenceId, 
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
 
                if (internalSequenceBean == null || 
internalSequenceBean.getValue() == null) {
                        String message = "TempSequenceId is not set correctly";
                        log.debug(message);
+                       
                        throw new SandeshaException(message);
                }
 
@@ -218,9 +215,6 @@
                allCompletedMsgsBean.setValue(str);
                
                seqPropMgr.update(allCompletedMsgsBean);                
-               
-               //commiting transaction
-               ackTransaction.commit();
                
                String lastOutMsgNoStr = 
SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,configCtx);
                if (lastOutMsgNoStr!=null ) {

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Fri May 12 12:16:32 2006
@@ -42,7 +42,6 @@
 import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
@@ -127,8 +126,6 @@
                                
.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
                                                .getConfigurationContext());
 
-
-
                FaultManager faultManager = new FaultManager();
                RMMsgContext faultMessageContext = 
faultManager.checkForLastMsgNumberExceeded(rmMsgCtx);
                if (faultMessageContext != null) {
@@ -141,6 +138,7 @@
                                throw new SandeshaException ("Could not send 
the fault message",e);
                        }
                        
+                       msgCtx.pause();
                        return;
                }
                
@@ -169,6 +167,7 @@
                                throw new SandeshaException ("Could not send 
the fault message",e);
                        }
                        
+                       msgCtx.pause();
                        return;
                }
                
@@ -191,14 +190,8 @@
                        return;
                }
 
-               Transaction lastUpdatedTimeTransaction = 
storageManager.getTransaction();
-               
                //updating the last activated time of the sequence.
                SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
-               lastUpdatedTimeTransaction.commit();
-               
-               Transaction updataMsgStringTransaction = storageManager
-                               .getTransaction();
                
                SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
                                
Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
@@ -263,16 +256,13 @@
                msgsBean.setValue(messagesStr);
                seqPropMgr.update(msgsBean);
 
-               updataMsgStringTransaction.commit();
-
-               Transaction invokeTransaction = storageManager.getTransaction();
-
                //      Pause the messages bean if not the right message to 
invoke.
                NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
                NextMsgBean bean = mgr.retrieve(sequenceId);
 
-               if (bean == null)
+               if (bean == null) {
                        throw new SandeshaException("Error- The sequence does 
not exist");
+               }
 
                InvokerBeanMgr storageMapMgr = 
storageManager.getStorageMapBeanMgr();
                
@@ -281,8 +271,6 @@
                
                if (inOrderInvocation) {
                        
-                       //pause the message
-                       rmMsgCtx.pause();
                        
                        SequencePropertyBean incomingSequenceListBean = 
(SequencePropertyBean) seqPropMgr
                                        .retrieve(
@@ -299,6 +287,7 @@
                                
incomingSequenceListBean.setValue(incomingSequenceList
                                                .toString());
 
+                               //this get inserted before
                                seqPropMgr.insert(incomingSequenceListBean);
                        }
 
@@ -312,7 +301,7 @@
                                //saving the property.
                                
incomingSequenceListBean.setValue(incomingSequenceList
                                                .toString());
-                               seqPropMgr.insert(incomingSequenceListBean);
+                               seqPropMgr.update(incomingSequenceListBean);
                        }
 
                        //saving the message.
@@ -330,6 +319,9 @@
                        } catch (Exception ex) {
                                throw new SandeshaException(ex.getMessage());
                        }
+                       
+                       //pause the message
+                       rmMsgCtx.pause();
 
                        //Starting the invoker if stopped.
                        SandeshaUtil
@@ -337,8 +329,6 @@
 
                }
 
-               invokeTransaction.commit();
-
                //Sending acknowledgements
                sendAckIfNeeded(rmMsgCtx, messagesStr);
 
@@ -367,7 +357,7 @@
                                
.getSandeshaStorageManager(msgCtx.getConfigurationContext());
                SequencePropertyBeanMgr seqPropMgr = storageManager
                                .getSequencePropretyBeanMgr();
-
+               
                Sequence sequence = (Sequence) rmMsgCtx
                                
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
                String sequenceId = sequence.getIdentifier().getIdentifier();
@@ -416,7 +406,6 @@
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext);
                SequencePropertyBeanMgr seqPropMgr = 
storageManager.getSequencePropretyBeanMgr();
 
-               Transaction outHandlerTransaction = 
storageManager.getTransaction();
                boolean serverSide = msgContext.isServerSide();  
 
                // setting message Id if null
@@ -590,16 +579,18 @@
                                throw new SandeshaException (e);
                        }
                        
-                       if (requestMessageContext==null) 
+                       if (requestMessageContext==null) {
                                throw new SandeshaException ("Request message 
context is null, cant find out the request side sequenceID");
+                       }
                        
                        RMMsgContext requestRMMsgCtx = 
MsgInitializer.initializeMessage(requestMessageContext);
                        Sequence sequence = (Sequence) 
requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
                        
                        String requestSequenceID = 
sequence.getIdentifier().getIdentifier();
                        SequencePropertyBean specVersionBean = 
seqPropMgr.retrieve(requestSequenceID,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
-                       if (specVersionBean==null) 
+                       if (specVersionBean==null) {
                                throw new SandeshaException ("SpecVersion 
sequence property bean is not available for the incoming sequence. Cant find 
the RM version for outgoing side");
+                       }
                        
                        specVersion = specVersionBean.getValue();
                } else {
@@ -744,8 +735,6 @@
                        processResponseMessage(rmMsgCtx, internalSequenceId, 
messageNumber,storageKey);
                
                msgContext.pause();  // the execution will be stopped.
-               outHandlerTransaction.commit();         
-
        }
        
        private void addCreateSequenceMessage(RMMsgContext applicationRMMsg,

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 Fri May 12 12:16:32 2006
@@ -11,7 +11,6 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
@@ -45,13 +44,12 @@
                                throw new SandeshaException ("Could not send 
the fault message",e);
                        }
                        
+                       msgCtx.pause();
                        return;
                }
                
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configCtx);
                
-               Transaction closeSequenceTransaction = 
storageManager.getTransaction();
-               
                SequencePropertyBeanMgr sequencePropMgr = 
storageManager.getSequencePropretyBeanMgr();
                SequencePropertyBean sequenceClosedBean = new 
SequencePropertyBean ();
                sequenceClosedBean.setSequenceID(sequenceID);
@@ -107,9 +105,6 @@
                        String message = "Could not send the terminate sequence 
response";
                        throw new SandeshaException (message,e);
                }
-               
-               
-               closeSequenceTransaction.commit();
        }
        
        public void processOutMessage(RMMsgContext rmMsgCtx) throws 
SandeshaException {

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
 Fri May 12 12:16:32 2006
@@ -34,7 +34,6 @@
 import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
@@ -82,6 +81,7 @@
                                throw new SandeshaException ("Could not send 
the fault message",e);
                        }
                        
+                       createSeqMsg.pause();
                        return;
                }
 
@@ -92,7 +92,6 @@
                
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context);
                SequencePropertyBeanMgr seqPropMgr = 
storageManager.getSequencePropretyBeanMgr();
-               Transaction createSequenceTransaction = 
storageManager.getTransaction();   //begining of a new transaction
 
                try {
                        String newSequenceId = 
SequenceManager.setupNewSequence(createSeqRMMsg);  //newly created sequnceID.
@@ -170,11 +169,8 @@
                        outMessage.setResponseWritten(true);
 
                        //commiting tr. before sending the response msg.
-                       createSequenceTransaction.commit();
                        
-                       Transaction updateLastActivatedTransaction = 
storageManager.getTransaction();
                        
SequenceManager.updateLastActivatedTime(newSequenceId,createSeqRMMsg.getMessageContext().getConfigurationContext());
-                       updateLastActivatedTransaction.commit();
                        
                        AxisEngine engine = new AxisEngine(context);
                        engine.send(outMessage);

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 Fri May 12 12:16:32 2006
@@ -75,7 +75,7 @@
                        .getSandeshaStorageManager(configCtx);
                
                //Processing for ack if available
-               Transaction ackProcessTransaction = 
storageManager.getTransaction();
+///            Transaction ackProcessTransaction = 
storageManager.getTransaction();
                
                SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) 
createSeqResponseRMMsgCtx
                                
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
@@ -84,11 +84,11 @@
                        
ackProcessor.processInMessage(createSeqResponseRMMsgCtx);
                }
 
-               ackProcessTransaction.commit();
+///            ackProcessTransaction.commit();
                
                //Processing the create sequence response.
                
-               Transaction createSeqResponseTransaction = 
storageManager.getTransaction();
+///            Transaction createSeqResponseTransaction = 
storageManager.getTransaction();
                
                CreateSequenceResponse createSeqResponsePart = 
(CreateSequenceResponse) createSeqResponseRMMsgCtx
                                
.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
@@ -154,10 +154,10 @@
                sequencePropMgr.insert(outSequenceBean);
                sequencePropMgr.insert(internalSequenceBean);
 
-               createSeqResponseTransaction.commit();
+///            createSeqResponseTransaction.commit();
                
                
-               Transaction offerProcessTransaction = 
storageManager.getTransaction();
+///            Transaction offerProcessTransaction = 
storageManager.getTransaction();
                
                //processing for accept (offer has been sent)
                Accept accept = createSeqResponsePart.getAccept();
@@ -217,9 +217,9 @@
                        
                }
 
-               offerProcessTransaction.commit();
+///            offerProcessTransaction.commit();
                
-               Transaction updateAppMessagesTransaction = 
storageManager.getTransaction();
+///            Transaction updateAppMessagesTransaction = 
storageManager.getTransaction();
                
                SenderBean target = new SenderBean();
                target.setInternalSequenceID(internalSequenceId);
@@ -282,11 +282,11 @@
                        storageManager.updateMessageContext(key,applicationMsg);
                }
 
-               updateAppMessagesTransaction.commit();
+///            updateAppMessagesTransaction.commit();
                
-               Transaction lastUpdatedTimeTransaction = 
storageManager.getTransaction();
+///            Transaction lastUpdatedTimeTransaction = 
storageManager.getTransaction();
                
SequenceManager.updateLastActivatedTime(internalSequenceId,configCtx);
-               lastUpdatedTimeTransaction.commit();
+///            lastUpdatedTimeTransaction.commit();
                
                
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
                                
.setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Fri May 12 12:16:32 2006
@@ -39,7 +39,6 @@
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -106,6 +105,8 @@
                        } catch (AxisFault e) {
                                throw new SandeshaException ("Could not send 
the fault message",e);
                        }
+                       
+                       terminateSeqMsg.pause();
                        return;
                }
                
@@ -113,7 +114,6 @@
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context);
                SequencePropertyBeanMgr sequencePropertyBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
 
-               Transaction terminateReceivedTransaction = 
storageManager.getTransaction();
                SequencePropertyBean terminateReceivedBean = new 
SequencePropertyBean ();
                terminateReceivedBean.setSequenceID(sequenceId);
                
terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
@@ -127,9 +127,6 @@
                
                
setUpHighestMsgNumbers(context,storageManager,sequenceId,terminateSeqRMMsg);
                
-               terminateReceivedTransaction.commit();
-               
-               Transaction terminateTransaction = 
storageManager.getTransaction();
                
TerminateManager.cleanReceivingSideOnTerminateMessage(context,sequenceId);
                
                
@@ -139,16 +136,12 @@
                sequencePropertyBeanMgr.insert(terminatedBean);
                
                
-               terminateTransaction.commit(); 
-               
                //removing an entry from the listener
                String transport = 
terminateSeqMsg.getTransportIn().getName().getLocalPart();
        
-               Transaction lastUpdatedTransaction = 
storageManager.getTransaction();
                SequenceManager.updateLastActivatedTime(sequenceId,context);
                
-               lastUpdatedTransaction.commit();                
-               terminateSeqRMMsg.pause();
+               terminateSeqMsg.pause();
        }
 
        private void setUpHighestMsgNumbers (ConfigurationContext configCtx, 
StorageManager storageManager, String sequenceID, RMMsgContext terminateRMMsg) 
throws SandeshaException {
@@ -281,7 +274,7 @@
         if (outSequenceID==null)
                throw new SandeshaException ("SequenceID was not found. Cannot 
send the terminate message");
         
-               Transaction addTerminateSeqTransaction = 
storageManager.getTransaction();
+///            Transaction addTerminateSeqTransaction = 
storageManager.getTransaction();
                
                String terminated = 
SandeshaUtil.getSequenceProperty(outSequenceID,
                                
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED,configurationContext);
@@ -384,7 +377,7 @@
                rmMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
                
rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
                rmMsgCtx.getMessageContext().setTransportOut(new 
Sandesha2TransportOutDesc ());
-               addTerminateSeqTransaction.commit();
+///            addTerminateSeqTransaction.commit();
                
            AxisEngine engine = new AxisEngine (configurationContext);
            try {

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
 Fri May 12 12:16:32 2006
@@ -291,7 +291,7 @@
                        
                } else {
 
-                       Transaction asyncAckTransaction = 
storageManager.getTransaction();
+///                    Transaction asyncAckTransaction = 
storageManager.getTransaction();
 
                        SenderBeanMgr retransmitterBeanMgr = storageManager
                                        .getRetransmitterBeanMgr();
@@ -338,7 +338,7 @@
                        
                        //inserting the new ack.
                        retransmitterBeanMgr.insert(ackBean);
-                       asyncAckTransaction.commit();
+///                    asyncAckTransaction.commit();
 
                        //passing the message through sandesha2sender
                        
ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java 
Fri May 12 12:16:32 2006
@@ -411,6 +411,9 @@
                        } else {
                                SequencePropertyBeanMgr seqPropMgr = 
storageManager
                                                .getSequencePropretyBeanMgr();
+                               
+                               //TODO get the acksTo value using the property 
key.
+                               
                                String sequenceId = data.getSequenceId();
                                SequencePropertyBean acksToBean = 
seqPropMgr.retrieve(
                                                sequenceId, 
Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
 Fri May 12 12:16:32 2006
@@ -22,6 +22,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.client.SandeshaClient;
 import org.apache.sandesha2.client.SandeshaClientConstants;
@@ -132,6 +133,8 @@
        
        private void finalizeTimedOutSequence (String internalSequenceID, 
String sequenceID ,MessageContext messageContext) throws SandeshaException {
                ConfigurationContext configurationContext = 
messageContext.getConfigurationContext();
+               
+               
configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,messageContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
                SequenceReport report = 
SandeshaClient.getOutgoingSequenceReport(internalSequenceID 
,configurationContext);
                
TerminateManager.timeOutSendingSideSequence(configurationContext,internalSequenceID,
 false);
                

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java 
Fri May 12 12:16:32 2006
@@ -166,7 +166,7 @@
                try {
                        processor.setup();
                } catch (NoSuchMethodException e) {
-                       throw new SandeshaException(e.getMessage());
+                       throw new SandeshaException(e);
                }
 
                processor.processPolicy(policy);

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java 
Fri May 12 12:16:32 2006
@@ -419,7 +419,7 @@
        
        public static long getOutGoingSequenceAckedMessageCount (String 
internalSequenceID,ConfigurationContext configurationContext) throws 
SandeshaException {
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext);
-               Transaction transaction = storageManager.getTransaction();
+///            Transaction transaction = storageManager.getTransaction();
                SequencePropertyBeanMgr seqPropBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
                
                SequencePropertyBean findSeqIDBean = new SequencePropertyBean 
();
@@ -447,14 +447,14 @@
                        return 0; //No acknowledgement has been received yet.
                
                long noOfMessagesAcked = 
Long.parseLong(ackedMsgBean.getValue());
-               transaction.commit();
+///            transaction.commit();
                
                return noOfMessagesAcked;
        }
        
        public static boolean isOutGoingSequenceCompleted (String 
internalSequenceID,ConfigurationContext configurationContext) throws 
SandeshaException {
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext);
-               Transaction transaction = storageManager.getTransaction();
+///            Transaction transaction = storageManager.getTransaction();
                SequencePropertyBeanMgr seqPropBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
                
                SequencePropertyBean findSeqIDBean = new SequencePropertyBean 
();
@@ -484,14 +484,14 @@
                if ("true".equals(terminateAddedBean.getValue()))
                        return true;
 
-               transaction.commit();
+///            transaction.commit();
                return false;
        }
        
        public static boolean isIncomingSequenceCompleted (String sequenceID, 
ConfigurationContext configurationContext) throws SandeshaException {
                
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext);
-               Transaction transaction = storageManager.getTransaction();
+///            Transaction transaction = storageManager.getTransaction();
                SequencePropertyBeanMgr seqPropBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
                
                SequencePropertyBean terminateReceivedBean = 
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
@@ -500,7 +500,7 @@
                if (terminateReceivedBean!=null && 
"true".equals(terminateReceivedBean.getValue()))
                        complete = true;
                
-               transaction.commit();
+///            transaction.commit();
                return complete;
        }
        

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java 
Fri May 12 12:16:32 2006
@@ -240,8 +240,8 @@
                if 
(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
                        deleatable = false;
                
-               if 
(Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
-                       deleatable = false;
+//             if 
(Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
+//                     deleatable = false;
                
                if 
(Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED.equals(name))
                        deleatable = false;
@@ -316,6 +316,7 @@
                        SequencePropertyBean sequencePropertyBean = 
(SequencePropertyBean) iterator.next();
                        doUpdatesIfNeeded 
(outSequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
                        
+                       //TODO all properties which hv the temm:Seq:id as the 
key should be deletable.
                        if 
(isProportyDeletable(sequencePropertyBean.getName())) {
                                
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
                        }
@@ -330,7 +331,7 @@
                StorageManager storageManager = SandeshaUtil
                                
.getSandeshaStorageManager(configurationContext);
 
-               Transaction addTerminateSeqTransaction = 
storageManager.getTransaction();
+///            Transaction addTerminateSeqTransaction = 
storageManager.getTransaction();
                
                SequencePropertyBeanMgr seqPropMgr = storageManager
                                .getSequencePropretyBeanMgr();
@@ -424,7 +425,7 @@
                
terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
                
terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
                terminateRMMessage.getMessageContext().setTransportOut(new 
Sandesha2TransportOutDesc ());
-               addTerminateSeqTransaction.commit();
+///            addTerminateSeqTransaction.commit();
                
            AxisEngine engine = new AxisEngine (configurationContext);
            try {

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java 
Fri May 12 12:16:32 2006
@@ -100,43 +100,44 @@
                                log.debug(ex.getMessage());
                        }
 
+                       Transaction transaction = null;
+                       boolean rolebacked = false;
+                       
                        try {
-                               StorageManager storageManager = SandeshaUtil
-                                               
.getSandeshaStorageManager(context);
+                               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context);
                                NextMsgBeanMgr nextMsgMgr = 
storageManager.getNextMsgBeanMgr();
 
-                               InvokerBeanMgr storageMapMgr = storageManager
-                                               .getStorageMapBeanMgr();
+                               InvokerBeanMgr storageMapMgr = 
storageManager.getStorageMapBeanMgr();
 
                                SequencePropertyBeanMgr sequencePropMgr = 
storageManager
                                                .getSequencePropretyBeanMgr();
 
-                               Transaction preInvocationTransaction = 
storageManager.getTransaction();
+                               transaction = storageManager.getTransaction();
                                
                                //Getting the incomingSequenceIdList
                                SequencePropertyBean allSequencesBean = 
(SequencePropertyBean) sequencePropMgr
                                                .retrieve(
                                                                
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
                                                                
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-                               if (allSequencesBean == null)
-                                       continue;
-
-                               ArrayList allSequencesList = 
SandeshaUtil.getArrayListFromString (allSequencesBean
-                                               .getValue());
                                
-                               preInvocationTransaction.commit();
+                               if (allSequencesBean == null) {
+                                       continue;
+                               }
+                               ArrayList allSequencesList = 
SandeshaUtil.getArrayListFromString (allSequencesBean.getValue());
                                
                                Iterator allSequencesItr = 
allSequencesList.iterator();
-
+                               
                                currentIteration: while 
(allSequencesItr.hasNext()) {
-
                                        String sequenceId = (String) 
allSequencesItr.next();
                                        
-                                       Transaction invocationTransaction = 
storageManager.getTransaction();   //Transaction based invocation
+                                       //commiting the old transaction
+                                       transaction.commit();
+                                       
+                                       //starting a new transaction for the 
new iteration.
+                                       transaction = 
storageManager.getTransaction();
                                        
                                        NextMsgBean nextMsgBean = 
nextMsgMgr.retrieve(sequenceId);
                                        if (nextMsgBean == null) {
-
                                                String message = "Next message 
not set correctly. Removing invalid entry.";
                                                log.debug(message);
                                                allSequencesItr.remove();
@@ -144,14 +145,12 @@
                                                //cleaning the invalid data of 
the all sequences.
                                                
allSequencesBean.setValue(allSequencesList.toString());
                                                
sequencePropMgr.update(allSequencesBean);       
-                                               
-                                               throw new SandeshaException 
(message);
+                                               continue;
                                        }
 
                                        long nextMsgno = 
nextMsgBean.getNextMsgNoToProcess();
                                        if (nextMsgno <= 0) { 
-                                               String message = "Invalid 
messaage number as the Next Message Number. Removing invalid entry";
-                                               
+                                               String message = "Invalid 
message number as the Next Message Number.";
                                                throw new 
SandeshaException(message);
                                        }
 
@@ -163,57 +162,36 @@
                                        
                                        while (stMapIt.hasNext()) {
 
-                                               InvokerBean stMapBean = 
(InvokerBean) stMapIt
-                                                               .next();
+                                               InvokerBean stMapBean = 
(InvokerBean) stMapIt.next();
                                                String key = 
stMapBean.getMessageContextRefKey();
 
-
                                                MessageContext msgToInvoke = 
storageManager.retrieveMessageContext(key,context);
+                                               RMMsgContext rmMsg = 
MsgInitializer.initializeMessage(msgToInvoke);
 
-                                               RMMsgContext rmMsg = 
MsgInitializer
-                                                               
.initializeMessage(msgToInvoke);
-                                               Sequence seq = (Sequence) rmMsg
-                                                               
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
-                                               long msgNo = 
seq.getMessageNumber().getMessageNumber();
-
+                                               //have to commit the 
transaction before invoking. This may get changed when WS-AT is available.
+                                               transaction.commit();
+                                               
                                                try {
-                                                       //Invoking the message.
-
-                                                       //currently Transaction 
based invocation can be supplied only for the in-only case.
-                                                       
-                                                       if 
(!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern()))
 {
-                                                               
invocationTransaction.commit();
-                                                       }
-                                                       
+                                                       //Invoking the message. 
                                                                                
                        
+                                                       
msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
                                                        new AxisEngine 
(msgToInvoke.getConfigurationContext())
                                                                        
.resume(msgToInvoke);
                                                        invoked = true;
-                                                       
-                                                       if 
(!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern()))
 {
-                                                               
invocationTransaction = storageManager.getTransaction();
-                                                       }                       
                        
-
                                                        
storageMapMgr.delete(key);
                                                } catch (AxisFault e) {
                                                        throw new 
SandeshaException(e);
+                                               } finally {
+                                                       transaction = 
storageManager.getTransaction();
                                                }
-
+                                               
                                                //undating the next msg to 
invoke
 
-
                                                if (rmMsg.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION) {
                                                        Sequence sequence = 
(Sequence) rmMsg
                                                                        
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
                                                        if 
(sequence.getLastMessage() != null) {
-                                                               
                                                                
TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId);
-                                                               
-                                                               //this sequence 
has no more invocations
-//                                                             
stopInvokerForTheSequence(sequenceId);
-                                                               
                                                                //exit from 
current iteration. (since an entry was removed)
-                                                               
invocationTransaction.commit();
                                                                break 
currentIteration;
                                                        }
                                                }
@@ -223,15 +201,19 @@
                                                nextMsgno++;
                                                
nextMsgBean.setNextMsgNoToProcess(nextMsgno);
                                                nextMsgMgr.update(nextMsgBean);
-                                               invocationTransaction.commit();
-                                       }
+                                       }       
                                }
                                
-                       } catch (SandeshaException e1) {
+                       } catch (Exception e1) {
                                e1.printStackTrace();
+                               if (transaction!=null) {
+                                       transaction.rollback();
+                                       rolebacked = true;
+                               }
+                       } finally { 
+                               if (!rolebacked && transaction!=null) 
+                                       transaction.commit();
                        }
                }
-               
-               int i = 1;
        }
 }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=405839&r1=405838&r2=405839&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Fri 
May 12 12:16:32 2006
@@ -58,11 +58,9 @@
 public class Sender extends Thread {
 
        private boolean runSender = false;
-       private boolean stopSenderAfterWork = false;
        private ArrayList workingSequences = new ArrayList();
        private ConfigurationContext context = null;
        private static final Log log = LogFactory.getLog(Sender.class);
-       private ThreadPool threadPool = new ThreadPool ();
 
        public synchronized void stopSenderForTheSequence(String sequenceID) {
                workingSequences.remove(sequenceID);
@@ -103,29 +101,29 @@
                                log.debug("End printing Interrupt...");
                        }
                        
+                       Transaction transaction = null;
+                       boolean rolebacked = false;
+                       
                        try {
                                if (context == null) {
                                        String message = "Can't continue the 
Sender. Context is null";
                                        log.debug(message);
                                        throw new SandeshaException(message);
                                }
-
-                               Transaction pickMessagesToSendTransaction = 
storageManager.getTransaction();
-
+                               
+                               transaction = storageManager.getTransaction();
+                               
                                SenderBeanMgr mgr = 
storageManager.getRetransmitterBeanMgr();
                                SenderBean senderBean = mgr.getNextMsgToSend();
                                if (senderBean==null) {
-                                       pickMessagesToSendTransaction.commit();
                                        continue;
                            }
-                               
 
                                MessageRetransmissionAdjuster 
retransmitterAdjuster = new MessageRetransmissionAdjuster();
                                boolean continueSending = 
retransmitterAdjuster.adjustRetransmittion(senderBean, context);
-                               if (!continueSending)
+                               if (!continueSending) {
                                        continue;
-                               
-                               pickMessagesToSendTransaction.commit();
+                               }
                                
                                String key = (String) 
senderBean.getMessageContextRefKey();
                                MessageContext msgCtx = 
storageManager.retrieveMessageContext(key, context);
@@ -161,8 +159,6 @@
                                
                                updateMessage(msgCtx);
 
-                               Transaction preSendTransaction = 
storageManager.getTransaction();
-
                                int messageType = rmMsgCtx.getMessageType();
                                if (messageType == 
Sandesha2Constants.MessageTypes.APPLICATION) {
                                        Sequence sequence = (Sequence) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -177,16 +173,16 @@
                                        
AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx);
                                }
                                
-                               preSendTransaction.commit();
-                               
                                //sending the message
                                TransportOutDescription transportOutDescription 
= msgCtx.getTransportOut();
                                TransportSender transportSender = 
transportOutDescription.getSender();
                                        
+                               //have to commit the transaction before 
sending. This may get changed when WS-AT is available.
+                               transaction.commit();
+                               
                                boolean successfullySent = false;
                                if (transportSender != null) {
                                        try {
-                                               
                                                //TODO change this to cater for 
security.
                                                transportSender.invoke(msgCtx);
                                                successfullySent = true;
@@ -194,11 +190,11 @@
                                                // TODO Auto-generated catch 
block
                                            log.debug("Could not send message");
                                                
log.debug(e.getStackTrace().toString());
+                                       } finally {
+                                               transaction =  
storageManager.getTransaction();
                                        }
                                }
 
-                               Transaction postSendTransaction = 
storageManager.getTransaction();
-
                                // update or delete only if the object is still 
present.
                                SenderBean bean1 = 
mgr.retrieve(senderBean.getMessageID());
                                if (bean1 != null) {
@@ -210,14 +206,13 @@
                                                
mgr.delete(bean1.getMessageID());
                                }
 
-                               postSendTransaction.commit(); // commiting the 
current transaction
-
+                               
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+                               
                                if (successfullySent) {
                                        if (!msgCtx.isServerSide())
                                                checkForSyncResponses(msgCtx);
                                }
-
-                               Transaction terminateCleaningTransaction = 
storageManager.getTransaction();
+                               
                                if (rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
                                        // terminate sending side.
                                        TerminateSequence terminateSequence = 
(TerminateSequence) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
@@ -228,15 +223,22 @@
                                        
TerminateManager.terminateSendingSide(configContext,internalSequenceID, 
msgCtx.isServerSide());
                                }
 
-                               terminateCleaningTransaction.commit();
+                               
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
                                
-                       } catch (AxisFault e) {
+                       } catch (Exception e) {
                                String message = "An Exception was throws in 
sending";
                                log.debug(message,e);
                                
                                // TODO : when this is the client side throw 
the exception to
                                // the client when necessary.
 
+                               if (transaction!=null) {
+                                       transaction.rollback();
+                                       rolebacked = true;
+                               }
+                       } finally {
+                               if (transaction!=null && !rolebacked) 
+                                       transaction.commit();
                        }
                }
        }
@@ -311,7 +313,12 @@
                                log.debug("Valid SOAP envelope not found");
                                log.debug(e.getStackTrace().toString());
                        }
-
+                       
+                       //if the request msg ctx is withina a transaction, 
processing if the response should also happen
+                       //withing the same transaction
+                       
responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION
+                                       
,msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
+                       
                        if (resenvelope != null) {
                                responseMessageContext.setEnvelope(resenvelope);
                                AxisEngine engine = new AxisEngine(msgCtx



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to