Author: mckierna
Date: Mon Sep 10 23:55:50 2007
New Revision: 574490

URL: http://svn.apache.org/viewvc?rev=574490&view=rev
Log:
performance improvements

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
 Mon Sep 10 23:55:50 2007
@@ -84,10 +84,11 @@
                                return returnValue ;
                        }
                }
-               
+
+           StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
                //this will change the execution chain of this message to work 
correctly in retransmissions.
                //For e.g. Phases like security will be removed to be called in 
each retransmission.
-               SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
+           SandeshaUtil.modifyExecutionChainForStoring(msgCtx, storageManager);
 
                String DONE = (String) 
msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
                if (null != DONE && "true".equals(DONE)) {
@@ -97,7 +98,6 @@
                }
                
                
msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
 
                Transaction transaction = null;
 
@@ -126,7 +126,7 @@
                        }
 
                        if (msgProcessor != null){
-                               if(msgProcessor.processOutMessage(rmMsgCtx)){
+                       if(msgProcessor.processOutMessage(rmMsgCtx, 
transaction)){
                                        //the msg was paused
                                        returnValue = 
InvocationResponse.SUSPEND;
                                }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -266,7 +266,7 @@
                        
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
                        
                        // passing the message through sandesha2sender
-                       SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+                   SandeshaUtil.executeAndStore(ackRMMsgCtx, key, 
storageManager);
 
                        // inserting the new Ack.
                        senderBeanMgr.insert(ackBean);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -116,6 +116,12 @@
                String outSequenceId = 
sequenceAck.getIdentifier().getIdentifier();
                RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
 
+               if(rmsBean==null){
+                 if (log.isDebugEnabled())
+                         log.debug("Exit: 
AcknowledgementProcessor::processAckHeader, Sequence bean not found");
+                 return;
+               }
+               
                if (outSequenceId == null || "".equals(outSequenceId)) {
                        String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outSeqIDIsNull);
                        log.debug(message);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -51,6 +51,9 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
 import org.apache.sandesha2.wsrm.CreateSequence;
 import org.apache.sandesha2.wsrm.SequenceOffer;
 
@@ -82,7 +85,7 @@
                return false;
        }
        
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) throws 
AxisFault {
+       public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
tran) throws AxisFault {
                if (log.isDebugEnabled())
                        log.debug("Enter: 
ApplicationMsgProcessor::processOutMessage");
 
@@ -252,7 +255,7 @@
                                // server and the client sides.
                                if (rmsBean == null) {
                                        rmsBean = 
SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, 
storageManager);
-                                       rmsBean = 
addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+                                       rmsBean = 
addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager, tran);
                                }
                        }
                
@@ -405,7 +408,7 @@
                
                // processing the response if not an dummy.
                if (!dummyMessage)
-                       processResponseMessage(rmMsgCtx, rmsBean, 
internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager);
+                       processResponseMessage(rmMsgCtx, rmsBean, 
internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager, 
tran);
                
                //Users wont be able to get reliable response msgs in the back 
channel in the back channel of a 
                //reliable message. If he doesn't have a endpoint he should use 
polling mechanisms.
@@ -417,7 +420,7 @@
        }
 
        private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, 
RMSBean rmsBean,
-                       StorageManager storageManager) throws AxisFault {
+                       StorageManager storageManager, Transaction tran) throws 
AxisFault {
 
                if (log.isDebugEnabled())
                        log.debug("Enter: 
ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
@@ -484,7 +487,7 @@
 
                
createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
                
-               SandeshaUtil.executeAndStore(createSeqRMMessage, 
createSequenceMessageStoreKey);
+               SandeshaUtil.executeAndStore(createSeqRMMessage, 
createSequenceMessageStoreKey, storageManager);
 
                retransmitterMgr.insert(createSeqEntry);
 
@@ -497,7 +500,7 @@
        }
 
        private void processResponseMessage(RMMsgContext rmMsg, RMSBean 
rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
-                       String storageKey, StorageManager storageManager) 
throws AxisFault {
+                   String storageKey, StorageManager storageManager, 
Transaction tran) throws AxisFault {
                if (log.isDebugEnabled())
                        log.debug("Enter: 
ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", " 
+ outSequenceID);
 
@@ -521,6 +524,11 @@
                        }
                }
 
+               boolean sendingNow = false;
+               if(outSequenceID != null && 
!storageManager.hasUserTransaction(msg)) {
+                 sendingNow = true;
+               }
+               
                // Now that we have decided which sequence to use for the 
message, make sure that we secure
                // it with the correct token.
                RMMsgCreator.secureOutboundMessage(rmsBean, msg);
@@ -563,11 +571,35 @@
                // increasing the current handler index, so that the message 
will not be
                // going throught the SandeshaOutHandler again.
                msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
-
-               SandeshaUtil.executeAndStore(rmMsg, storageKey);
+               SandeshaUtil.executeAndStore(rmMsg, storageKey, storageManager);
+               
+               // Lock the sender bean before we insert it, if we are planning 
to send it ourselves
+               SenderWorker worker = null;
+               if(sendingNow) {
+                 String workId = appMsgEntry.getMessageID() + 
appMsgEntry.getTimeToSend();
+                 SandeshaThread sender = storageManager.getSender();
+                 ConfigurationContext context = msg.getConfigurationContext();
+                 WorkerLock lock = sender.getWorkerLock();
+      
+                 worker = new SenderWorker(context, appMsgEntry, 
rmsBean.getRMVersion());
+                 worker.setLock(lock);
+                 worker.setWorkId(workId);
+                 // Actually take the lock
+                 lock.addWork(workId, worker);
+               }
 
                retransmitterMgr.insert(appMsgEntry);
 
+               // Commit the transaction, so that the sender worker starts 
with a clean slate.
+               if(tran != null && tran.isActive()) tran.commit();
+                
+               if(worker != null) {
+                 try {
+                   worker.run();
+                 } catch(Exception e)  {
+                   log.error("Caught exception running SandeshaWorker", e);
+                 }
+               }
                if (log.isDebugEnabled())
                        log.debug("Exit: 
ApplicationMsgProcessor::processResponseMessage");
        }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -141,7 +141,7 @@
                return false;
        }
 
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) throws 
AxisFault {
+        public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
transaction) throws AxisFault {
                if (log.isDebugEnabled()) 
                        log.debug("Enter: 
CloseSequenceProcessor::processOutMessage");
                

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -343,7 +343,7 @@
                return true;
        }
 
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+       public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
transaction) {
                if (log.isDebugEnabled())
                        log.debug("Enter: 
CreateSeqMsgProcessor::processOutMessage");
 

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -262,7 +262,7 @@
                return true;
        }
 
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+       public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
transaction) {
                if (log.isDebugEnabled()) {
                        log.debug("Enter: 
CreateSeqResponseMsgProcessor::processOutMessage");
                        log.debug("Exit: 
CreateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -51,7 +51,7 @@
                return true;
        }
 
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+       public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
transaction) {
                return false;
        }
 

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -211,7 +211,7 @@
                messagePending.toSOAPEnvelope(returnMessage.getEnvelope());
        }
 
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+       public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
transaction) {
                return false;
        }
 

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -40,5 +40,5 @@
         * @return true if the msg context has been paused
         * @throws AxisFault
         */
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) throws 
AxisFault;
-}
\ No newline at end of file
+       public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
transaction)throws AxisFault;
+}

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -28,6 +28,7 @@
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.TransportUtils;
 import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.commons.logging.Log;
@@ -345,6 +346,18 @@
                        RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, 
storageManager,true);
 
                        AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, 
sequenceId, timeToSend, storageManager);
+                       
+                       // If the MEP doesn't need the backchannel, and nor do 
we, we should signal it so that it
+                       // can close off as soon as possible.
+                       result = InvocationResponse.ABORT;
+                       RequestResponseTransport t = null;
+                       t = (RequestResponseTransport) 
rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+                       
+                       // Tell the transport that there will be no response 
message
+                       if(t != null) {
+                               TransportUtils.setResponseWritten(msgCtx, 
false);
+                               t.acknowledgeMessage(msgCtx);
+                       }
                }
                
                // If this message matches the WSRM 1.0 pattern for an empty 
last message (e.g.

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -349,7 +349,7 @@
                return terminateSeqResponseRMMsg;
        }
 
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) throws 
AxisFault {
+       public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
transaction) throws AxisFault {
 
                if (log.isDebugEnabled())
                        log.debug("Enter: 
TerminateSeqMsgProcessor::processOutMessage");

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
 Mon Sep 10 23:55:50 2007
@@ -89,7 +89,7 @@
                return true;
   }
 
-       public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+       public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction 
transaction) {
                if(log.isDebugEnabled()) log.debug("Enter: 
TerminateSeqResponseMsgProcessor::processOutMessage");
                if(log.isDebugEnabled()) log.debug("Exit: 
TerminateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
                return false;

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
 Mon Sep 10 23:55:50 2007
@@ -149,17 +149,18 @@
                        // This sequence must have been terminated, or deleted
                        stopThreadForSequence(entry.getSequenceId(), true);
                } else {
-      if (log.isDebugEnabled())
-        log.debug("Polling rms " + beanToPoll);
+                       if (log.isDebugEnabled())
+                               log.debug("Polling rms " + beanToPoll);
                        // The sequence is there, but we still only poll if we 
are expecting reply messages,
                        // or if we don't have clean ack state.
-      boolean cleanAcks = false;
-      if (beanToPoll.getNextMessageNumber() > -1)
-       cleanAcks = 
AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(),
 beanToPoll.getNextMessageNumber());
+                       boolean cleanAcks = false;
+                       if (beanToPoll.getNextMessageNumber() > -1)
+                               cleanAcks = 
AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(),
 beanToPoll.getNextMessageNumber());
                        long  repliesExpected = beanToPoll.getExpectedReplies();
-                       if((force ||    !cleanAcks || repliesExpected > 0) && 
beanToPoll.getReferenceMessageStoreKey() != null)
-                               pollForSequence(beanToPoll.getAnonymousUUID(), 
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), 
beanToPoll, entry);
+                       if(beanToPoll.getSequenceID() != null && (force || 
!cleanAcks || repliesExpected > 0) && beanToPoll.getReferenceMessageStoreKey() 
!= null)
+                           pollForSequence(beanToPoll.getAnonymousUUID(), 
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), 
beanToPoll, entry);
                }
+               
 
                if(log.isDebugEnabled()) log.debug("Exit: 
PollingManager::pollRMSSide");
        }
@@ -262,7 +263,7 @@
                        //this message should not be sent until it is 
qualified. I.e. till it is sent through the Sandesha2TransportSender.
                        
makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
                        
-                       SandeshaUtil.executeAndStore(makeConnectionRMMessage, 
makeConnectionMsgStoreKey);
+               SandeshaUtil.executeAndStore(makeConnectionRMMessage, 
makeConnectionMsgStoreKey, storageManager);
                        
                        senderBeanMgr.insert(makeConnectionSenderBean);         
        
                }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java
 Mon Sep 10 23:55:50 2007
@@ -100,5 +100,16 @@
        public abstract MessageContext retrieveMessageContext (String 
storageKey, ConfigurationContext configContext) throws SandeshaStorageException;
 
        public abstract void removeMessageContext (String storageKey) throws 
SandeshaStorageException;
-
+       
+       
+       /**
+        * If there is no user transaction in scope then we can optimize the 
sending / invoking of a
+        * message. This method allows the StorageManager to tell the core 
Sandesha code if there
+        * is a transaction in scope.
+        * @return true, if there is a user transaction in scope.
+        */
+       public abstract boolean hasUserTransaction(MessageContext message) 
throws SandeshaStorageException;
+        
+       public abstract boolean requiresMessageSerialization();
+       
 }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 Mon Sep 10 23:55:50 2007
@@ -338,6 +338,15 @@
        public void  initStorage (AxisModule moduleDesc) {
                
        }
+       
+       //We do not support user transactions in-memory
+       public boolean hasUserTransaction(MessageContext msg) {
+               return false;
+       }
+         
+       public boolean requiresMessageSerialization() {
+               return useSerialization;
+       }
 
        private class SerializedStorageEntry {
                MessageContext       message;
@@ -355,6 +364,7 @@
                SOAPEnvelope   envelope;
        }
 }
+
 
 
 

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
 Mon Sep 10 23:55:50 2007
@@ -18,9 +18,7 @@
 package org.apache.sandesha2.util;
 
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Set;
 
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
@@ -106,63 +104,48 @@
                        if(log.isDebugEnabled()) log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent, anon");
                        return transaction;
                }
-               
-               // From here on, we must be dealing with a real address. 
Piggyback all sequences that have an
-               // acksTo that matches the To address, and that have an 
ackMessage queued up for sending.
-               Set acked = new HashSet();
-               SenderBean findBean = new SenderBean();
-               findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-               findBean.setSend(true);
-               findBean.setToAddress(target.getAddress());
-
-               Collection collection = retransmitterBeanMgr.find(findBean);
-               
-               if (transaction != null && transaction.isActive())
-                       transaction.commit();
-               
-               transaction = storageManager.getTransaction();
-               
-               Iterator it = collection.iterator();
-               while (it.hasNext()) {
-                       SenderBean ackBean = (SenderBean) it.next();
-
+           // From here on, we must be dealing with a real address. Piggyback 
all sequences that have an
+           // acksTo that matches the To address, and that have an ackMessage 
queued up for sending. We
+           // search for RMDBeans first, to avoid a deadlock.
+           //
+           // As a special case, if this is a terminate sequence message then 
add in ack messages for
+           // any sequences that have an acksTo that matches the target 
address. This helps to ensure
+           // that request-response sequence pairs end cleanly.
+           RMDBean findRMDBean = new RMDBean();
+           findRMDBean.setAcksToEPR(target.getAddress());
+           findRMDBean.setTerminated(false);
+           Collection rmdBeans = 
storageManager.getRMDBeanMgr().find(findRMDBean);
+           Iterator sequences = rmdBeans.iterator();
+           while(sequences.hasNext()) {
+             RMDBean sequence = (RMDBean) sequences.next();
+             String sequenceId = sequence.getSequenceID();
+             
+             // Look for the SenderBean that carries the ack, there should be 
at most one
+             SenderBean findBean = new SenderBean();
+             findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+             findBean.setSend(true);
+             findBean.setSequenceID(sequenceId);
+             findBean.setToAddress(target.getAddress());
+             
+             SenderBean ackBean = retransmitterBeanMgr.findUnique(findBean);
+             
                        // Piggybacking will happen only if the end of ack 
interval (timeToSend) is not reached.
                        long timeNow = System.currentTimeMillis();
-                       if (ackBean.getTimeToSend() > timeNow) {
+                   if (ackBean != null && ackBean.getTimeToSend() > timeNow) {
                                // Delete the beans that would have sent the ack
                                
retransmitterBeanMgr.delete(ackBean.getMessageID());
                                
storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
 
-                               String sequenceId = ackBean.getSequenceID();
                                if (log.isDebugEnabled()) 
log.debug("Piggybacking ack for sequence: " + sequenceId);
+                       RMMsgCreator.addAckMessage(rmMessageContext, 
sequenceId, sequence);
+
+                   } else if(rmMessageContext.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+                       if(log.isDebugEnabled()) log.debug("Adding extra acks, 
as this is a terminate");
+                         
+                       if(sequence.getHighestInMessageNumber() > 0) {
+                                         if(log.isDebugEnabled()) 
log.debug("Piggybacking ack for sequence: " + sequenceId);
 
-                               RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
-                               if(rmdBean != null && !rmdBean.isTerminated()) {
-                                       
RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, rmdBean);
-                               }
-                               acked.add(sequenceId);
-                       }
-               }
-               
-               // As a special case, if this is a terminate sequence message 
then add in ack messages for
-               // any sequences that have an acksTo that matches the target 
address. This helps to ensure
-               // that request-response sequence pairs end cleanly.
-               if(rmMessageContext.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-                       if(log.isDebugEnabled()) log.debug("Adding extra acks, 
as this is a terminate");
-                       
-                       RMDBean findRMDBean = new RMDBean();
-                       findRMDBean.setAcksToEPR(target.getAddress());
-                       findRMDBean.setTerminated(false);
-                       Collection rmdBeans = 
storageManager.getRMDBeanMgr().find(findRMDBean);
-                       Iterator sequences = rmdBeans.iterator();
-                       while(sequences.hasNext()) {
-                               RMDBean sequence = (RMDBean) sequences.next();
-                               String sequenceId = sequence.getSequenceID();
-                               
-                               if(!acked.contains(sequenceId) && 
sequence.getHighestInMessageNumber() > 0) {
-                                       if(log.isDebugEnabled()) 
log.debug("Piggybacking ack for sequence: " + sequenceId);
                                        
RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence);
-                                       acked.add(sequenceId);
                                }
                        }
                }
@@ -313,7 +296,7 @@
                // passing the message through sandesha2sender
                ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, 
Sandesha2Constants.VALUE_TRUE);
                
-               SandeshaUtil.executeAndStore(ackRMMsgContext, key);
+           SandeshaUtil.executeAndStore(ackRMMsgContext, key, storageManager);
 
                // inserting the new ack.
                retransmitterBeanMgr.insert(ackBean);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 Mon Sep 10 23:55:50 2007
@@ -25,6 +25,7 @@
 import java.util.Map;
 
 import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamReader;
 
 import org.apache.axiom.om.OMElement;
@@ -883,7 +884,7 @@
                else 
                        return false;
        }
-       public static void executeAndStore (RMMsgContext rmMsgContext, String 
storageKey) throws AxisFault {
+        public static void executeAndStore (RMMsgContext rmMsgContext, String 
storageKey, StorageManager manager) throws AxisFault {
                if (log.isDebugEnabled())
                        log.debug("Enter: SandeshaUtil::executeAndStore, " + 
storageKey);
                
@@ -891,7 +892,7 @@
                ConfigurationContext configurationContext = 
msgContext.getConfigurationContext();
 
                SandeshaPolicyBean policy = 
getPropertyBean(msgContext.getAxisOperation());
-               if(policy.isUseMessageSerialization()) {
+           if(manager.requiresMessageSerialization()) {
                        
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_TRUE);
 
                        StorageManager store = 
getSandeshaStorageManager(configurationContext, 
configurationContext.getAxisConfiguration());
@@ -908,27 +909,26 @@
        
                        Sandesha2TransportOutDesc sandesha2TransportOutDesc = 
new Sandesha2TransportOutDesc();
                        msgContext.setTransportOut(sandesha2TransportOutDesc);
+                       
+                       //this invocation has to be a blocking one.
+                       Boolean isTransportNonBlocking = (Boolean) 
msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+                       if (isTransportNonBlocking!=null && 
isTransportNonBlocking.booleanValue())
+                               
msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
        
                        // sending the message once through 
Sandesha2TransportSender.
                        if (msgContext.isPaused())
                                AxisEngine.resumeSend(msgContext);
                        else {
-                               //this invocation has to be a blocking one.
-                               
-                               Boolean isTransportNonBlocking = (Boolean) 
msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
-                               if (isTransportNonBlocking!=null && 
isTransportNonBlocking.booleanValue())
-                                       
msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
-                               
-                               AxisEngine.send(msgContext);
-                               
-                               
msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, 
isTransportNonBlocking);
+                               AxisEngine.send(msgContext);    
                        }
+                       //put the original value of isTransportNonBlocking back 
on
+                       
msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, 
isTransportNonBlocking);
                }
                if (log.isDebugEnabled())
                        log.debug("Exit: SandeshaUtil::executeAndStore");
        }
        
-       public static void modifyExecutionChainForStoring (MessageContext 
message)
+       public static void modifyExecutionChainForStoring (MessageContext 
message, StorageManager manager)
        throws SandeshaException
        {
                
@@ -936,8 +936,7 @@
                if (property!=null)
                        return; //Phases are already set. Dont hv to redo.
                
-               SandeshaPolicyBean policy = 
getPropertyBean(message.getAxisOperation());
-               if(policy.isUseMessageSerialization())
+           if(manager.requiresMessageSerialization())
                        return; // No need to mess with the transport when we 
use message serialization
                
                TransportOutDescription transportOutDescription = 
message.getTransportOut();
@@ -1120,40 +1119,53 @@
        }       
        
        
-       public static SOAPEnvelope cloneEnvelope(SOAPEnvelope envelope) throws 
SandeshaException {
-               
-               // Now clone the env and set it in the message context
-               XMLStreamReader streamReader = 
envelope.cloneOMElement().getXMLStreamReader();
-               SOAPEnvelope clonedEnvelope = new 
StAXSOAPModelBuilder(streamReader, null).getSOAPEnvelope();
-
-               // you have to explicitely set the 'processed' attribute for 
header
-               // blocks, since it get lost in the above read from the stream.
-
-               SOAPHeader header = envelope.getHeader();
-               if (header != null) {
-                       Iterator childrenOfOldEnv = header.getChildElements();
-                       Iterator childrenOfNewEnv = 
clonedEnvelope.getHeader().getChildElements();
-                       while (childrenOfOldEnv.hasNext()) {
-                               
-                               SOAPHeaderBlock oldEnvHeaderBlock = 
(SOAPHeaderBlock) childrenOfOldEnv.next();
-                               SOAPHeaderBlock newEnvHeaderBlock = 
(SOAPHeaderBlock) childrenOfNewEnv.next();
+  public static SOAPEnvelope cloneEnvelope(SOAPEnvelope envelope) throws 
SandeshaException {
+           
+           // Now clone the env and set it in the message context. We need to 
be sure that we
+    // close off the stream reader, in order to free up some of the heap.
+    XMLStreamReader streamReader = null;
+    SOAPEnvelope clonedEnvelope = null;
+    try {
+      streamReader = envelope.getXMLStreamReader();     
+      clonedEnvelope = new StAXSOAPModelBuilder(streamReader, 
null).getSOAPEnvelope();
+      // you have to explicitely set the 'processed' attribute for header
+      // blocks, since it get lost in the above read from the stream.
+     
+      SOAPHeader header = envelope.getHeader();
+      if (header != null) {
+        Iterator childrenOfOldEnv = header.getChildElements();
+        Iterator childrenOfNewEnv = 
clonedEnvelope.getHeader().getChildElements();
+        while (childrenOfOldEnv.hasNext()) {        
+          SOAPHeaderBlock oldEnvHeaderBlock = (SOAPHeaderBlock) 
childrenOfOldEnv.next();
+          SOAPHeaderBlock newEnvHeaderBlock = (SOAPHeaderBlock) 
childrenOfNewEnv.next();
+
+          QName oldEnvHeaderBlockQName = oldEnvHeaderBlock.getQName();
+          if (oldEnvHeaderBlockQName != null) {
+            if (oldEnvHeaderBlockQName.equals(newEnvHeaderBlock.getQName())) {
+              if (oldEnvHeaderBlock.isProcessed())
+                newEnvHeaderBlock.setProcessed();
+              } else {
+                String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cloneDoesNotMatchToOriginal);
+                throw new SandeshaException(message);
+              }
+            }
+          }
+        }
+        // Completely build the new tree
+        clonedEnvelope.build();
+      } finally {
+      if(streamReader != null) {
+        try {
+          streamReader.close();
+        } catch(XMLStreamException e) {
+          log.debug("Caught exception closing stream", e);
+        }
+      }
 
-                               QName oldEnvHeaderBlockQName = 
oldEnvHeaderBlock.getQName();
-                               if (oldEnvHeaderBlockQName != null) {
-                                       if 
(oldEnvHeaderBlockQName.equals(newEnvHeaderBlock.getQName())) {
-                                               if 
(oldEnvHeaderBlock.isProcessed())
-                                                       
newEnvHeaderBlock.setProcessed();
-                                       } else {
-                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cloneDoesNotMatchToOriginal);
-                                               throw new 
SandeshaException(message);
-                                       }
-                               }
-                       }
-               }
-               
-               return clonedEnvelope;
-       }
-       
+    }
+    return clonedEnvelope;
+  }
+         
        public static final String getStackTraceFromException(Exception e) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     PrintWriter pw = new PrintWriter(baos);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
 Mon Sep 10 23:55:50 2007
@@ -366,7 +366,7 @@
 //             
terminateRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY,
 sequencePropertyKey);
                
                // / addTerminateSeqTransaction.commit();
-               SandeshaUtil.executeAndStore(terminateRMMessage, key);
+           SandeshaUtil.executeAndStore(terminateRMMessage, key, 
storageManager);
                
                SenderBeanMgr retramsmitterMgr = 
storageManager.getSenderBeanMgr();
                

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
 Mon Sep 10 23:55:50 2007
@@ -167,7 +167,7 @@
 
                SenderBeanMgr retramsmitterMgr = 
storageManager.getSenderBeanMgr();
                
-               SandeshaUtil.executeAndStore(rmMsgCtx, key);
+           SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
        
                retramsmitterMgr.insert(senderBean);
                

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
 Mon Sep 10 23:55:50 2007
@@ -124,7 +124,7 @@
                                        
                                                //adding the workId to the lock 
after assigning it to a thread makes sure 
                                                //that all the workIds in the 
Lock are handled by threads.
-                                               getWorkerLock().addWork(workId);
+                                   getWorkerLock().addWork(workId, worker);
 
                                                long msgNumber = 
invoker.getMsgNo();
                                                //if necessary, update the 
"next message number" bean under this transaction
@@ -349,12 +349,14 @@
                                if(contextMgr != null) {
                                        work = contextMgr.wrapWithContext(work, 
bean.getContext());
                                }
-                               threadPool.execute(work);
-                               
-                               //adding the workId to the lock after assigning 
it to a thread makes sure 
-                               //that all the workIds in the Lock are handled 
by threads.
-                               getWorkerLock().addWork(workId);
-                               
+                               try {
+                                       // Set the lock up before we start the 
thread, but roll it back
+                                       // if we hit any problems
+                                       getWorkerLock().addWork(workId, worker);
+                                       threadPool.execute(work);
+                               } catch(Exception e) {
+                                       getWorkerLock().removeWork(workId);
+                               }                               
                                processedMessage = true;
                        }
                        

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
 Mon Sep 10 23:55:50 2007
@@ -1,5 +1,7 @@
 package org.apache.sandesha2.workers;
 
+import org.apache.axiom.om.impl.builder.StAXBuilder;
+import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
@@ -7,6 +9,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.util.MessageContextBuilder;
 import org.apache.commons.logging.Log;
@@ -85,19 +88,30 @@
                                                && 
Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
                                        postFailureInvocation = true;
 
+                       InvocationResponse response = null;
                                if (postFailureInvocation) {
                                        
makeMessageReadyForReinjection(msgToInvoke);
                                        if (log.isDebugEnabled())
                                                log.debug("Receiving message, 
key=" + messageContextKey + ", msgCtx="
                                                                + 
msgToInvoke.getEnvelope().getHeader());
-                                       AxisEngine.receive(msgToInvoke);
+                                       response = 
AxisEngine.receive(msgToInvoke);
                                } else {
                                        if (log.isDebugEnabled())
                                                log.debug("Resuming message, 
key=" + messageContextKey + ", msgCtx="
                                                                + 
msgToInvoke.getEnvelope().getHeader());
                                        msgToInvoke.setPaused(false);
-                                       AxisEngine.resumeReceive(msgToInvoke);
+                                       response = 
AxisEngine.resumeReceive(msgToInvoke);
                                }
+                       if(!InvocationResponse.SUSPEND.equals(response)) {
+                           // Performance work - need to close the 
XMLStreamReader to prevent GC thrashing.
+                           SOAPEnvelope env = msgToInvoke.getEnvelope();
+                           if(env!=null){
+                             StAXBuilder sb = 
(StAXBuilder)msgToInvoke.getEnvelope().getBuilder();
+                             if(sb!=null){
+                               sb.close();
+                             }
+                           }
+                       }
 
                        } catch (Exception e) {
                                if (log.isDebugEnabled())
@@ -105,7 +119,6 @@
 
                                handleFault(rmMsg, e);
                        }
-
 
                        transaction = storageManager.getTransaction();
                         

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
 Mon Sep 10 23:55:50 2007
@@ -131,8 +131,10 @@
                        }
                }
                
-               if (log.isDebugEnabled())
-                       log.debug("Exit: SandeshaThread::stopRunning, " + this);
+           // In a unit test, tracing 'this' once the thread was stopped caused
+           // an exception, so we just trace exit.
+           if (log.isDebugEnabled())
+             log.debug("Exit: SandeshaThread::stopRunning");
        }
        
        public synchronized boolean isThreadStarted() {

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 Mon Sep 10 23:55:50 2007
@@ -200,12 +200,15 @@
                        SenderWorker worker = new SenderWorker(context, 
senderBean, rmVersion);
                        worker.setLock(getWorkerLock());
                        worker.setWorkId(workId);
-                       threadPool.execute(worker);
-
-                       // adding the workId to the lock after assigning it to 
a thread
-                       // makes sure
-                       // that all the workIds in the Lock are handled by 
threads.
-                       getWorkerLock().addWork(workId);
+                       
+                       try {
+                               // Set the lock up before we start the thread, 
but roll it back
+                               // if we hit any problems
+                               getWorkerLock().addWork(workId, worker);
+                               threadPool.execute(worker);
+                       } catch(Exception e) {
+                               getWorkerLock().removeWork(workId);
+                       }                       
 
                        // If we got to here then we found work to do on the 
sequence, so we should
                        // remember not to sleep at the end of the list of 
sequences.

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
 Mon Sep 10 23:55:50 2007
@@ -4,6 +4,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.axiom.om.impl.builder.StAXBuilder;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
@@ -76,6 +77,12 @@
                
                if (log.isDebugEnabled())
                        log.debug("Enter: SenderWorker::run");
+               
+           // If we are not the holder of the correct lock, then we have to 
stop
+           if(lock != null && !lock.ownsLock(workId, this)) {
+             if (log.isDebugEnabled()) log.debug("Exit: SenderWorker::run, 
another worker holds the lock");
+             return;
+           }
 
                Transaction transaction = null;
                
@@ -253,8 +260,7 @@
                        try {
                                InvocationResponse response = 
InvocationResponse.CONTINUE;
                                
-                               SandeshaPolicyBean policy = 
SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation());
-                               if(policy.isUseMessageSerialization()) {
+                       if(storageManager.requiresMessageSerialization()) {
                                        if(msgCtx.isPaused()) {
                                                if (log.isDebugEnabled())
                                                        log.debug("Resuming a 
send for message : " + msgCtx.getEnvelope().getHeader());
@@ -268,12 +274,6 @@
                                                AxisEngine.send(msgCtx);  // 
TODO check if this should return an invocation response
                                        }
                                } else {
-                                       // had to fully build the SOAP envelope 
to support
-                                       // retransmissions.
-                                       // Otherwise a 'parserAlreadyAccessed' 
exception could
-                                       // get thrown in retransmissions.
-                                       // But this has a performance reduction.
-                                       msgCtx.getEnvelope().build();
        
                                        ArrayList retransmittablePhases = 
(ArrayList) msgCtx.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
                                        if (retransmittablePhases!=null) {
@@ -289,6 +289,7 @@
                                
                                        if (log.isDebugEnabled())
                                                log.debug("Resuming a send for 
message : " + msgCtx.getEnvelope().getHeader());
+                               
msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
                                        response = 
AxisEngine.resumeSend(msgCtx);
                                }
                                if(log.isDebugEnabled()) log.debug("Engine 
resume returned " + response);
@@ -647,9 +648,21 @@
                                responseMessageContext.setSoapAction("");
                        }
 
+               InvocationResponse response = null;
+               
                        if (resenvelope!=null) {
-                               AxisEngine.receive(responseMessageContext);
+                               response = 
AxisEngine.receive(responseMessageContext);
                        }
+               if(!InvocationResponse.SUSPEND.equals(response)) {
+                   // Performance work - need to close the XMLStreamReader to 
prevent GC thrashing.
+                   SOAPEnvelope env = responseMessageContext.getEnvelope();
+                   if(env!=null){
+                     StAXBuilder sb = 
(StAXBuilder)responseMessageContext.getEnvelope().getBuilder();
+                     if(sb!=null){
+                       sb.close();
+                     }            
+                   }
+               }
 
                } catch (Exception e) {
                        String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
 Mon Sep 10 23:55:50 2007
@@ -1,25 +1,47 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *  
+ */
 package org.apache.sandesha2.workers;
 
-import java.util.ArrayList;
+import java.util.HashMap;
 
 public class WorkerLock {
 
-       public ArrayList workList = null;
+  private HashMap locks = new HashMap();
        
        public WorkerLock () {
-               workList = new ArrayList ();
+
        }
        
-       public synchronized void addWork (String work) {
-               workList.add(work);
-       }
+  public synchronized void addWork (String work, Object owner) {
+    if(locks.containsKey(work)) return;
+    locks.put(work, owner);
+  }
        
        public synchronized void removeWork (String work) {
-               workList.remove(work);
+    locks.remove(work);
        }
        
        public synchronized boolean isWorkPresent (String work) {
-               return workList.contains(work);
+    return locks.containsKey(work);
        }
+       
+        public synchronized boolean ownsLock(String work, Object owner) {
+           Object realOwner = locks.get(work);
+           return realOwner == owner;
+         }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to