Author: mlovett
Date: Tue Nov 28 03:08:23 2006
New Revision: 479987

URL: http://svn.apache.org/viewvc?view=rev&rev=479987
Log:
Andy's patch to ensure each message is only stored once, instead of being 
stored and then updated. See SANDESHA2-52.

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 Tue Nov 28 03:08:23 2006
@@ -269,16 +269,15 @@
 
                        ackBean.setTimeToSend(timeToSend);
 
-                       storageManager.storeMessageContext(key, ackMsgCtx);
                        
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
                        
-                       // inserting the new ack.
-                       retransmitterBeanMgr.insert(ackBean);
-
                        // passing the message through sandesha2sender
 
                        SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
 
+                       // inserting the new ack.
+                       retransmitterBeanMgr.insert(ackBean);
+
                        
SandeshaUtil.startSenderForTheSequence(configurationContext, sequenceId);
 
                        msgContext.pause();
@@ -386,8 +385,6 @@
                SenderBean ackRequestBean = new SenderBean();
                ackRequestBean.setMessageContextRefKey(key);
 
-               storageManager.storeMessageContext(key, msgContext);
-
                // Set a retransmitter lastSentTime so that terminate will be 
send with
                // some delay.
                // Otherwise this get send before return of the current request 
(ack).
@@ -413,9 +410,9 @@
                ackRequestBean.setInternalSequenceID(internalSeqenceID);
                ackRequestBean.setSequenceID(outSequenceID);
                
-               retramsmitterMgr.insert(ackRequestBean);
-
                SandeshaUtil.executeAndStore(ackRequestRMMsg, key);
+
+               retramsmitterMgr.insert(ackRequestBean);
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Tue Nov 28 03:08:23 2006
@@ -577,12 +577,11 @@
                        createSeqEntry.setToAddress(to.getAddress());
 
                
createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
-               
storageManager.storeMessageContext(createSequenceMessageStoreKey, 
createSeqMsg); // storing the message
                
+               SandeshaUtil.executeAndStore(createSeqRMMessage, 
createSequenceMessageStoreKey);
+
                retransmitterMgr.insert(createSeqEntry);
 
-               SandeshaUtil.executeAndStore(createSeqRMMessage, 
createSequenceMessageStoreKey);
-               
                if (log.isDebugEnabled())
                        log.debug("Exit: 
ApplicationMsgProcessor::addCreateSequenceMessage");
        }
@@ -752,16 +751,16 @@
                        appMsgEntry.setToAddress(to.getAddress());
                
                appMsgEntry.setInternalSequenceID(internalSequenceId);
-               storageManager.storeMessageContext(storageKey, msg);
 
                msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
-               retransmitterMgr.insert(appMsgEntry);
 
                // increasing the current handler index, so that the message 
will not be
                // going throught the SandeshaOutHandler again.
                msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
 
                SandeshaUtil.executeAndStore(rmMsg, storageKey);
+
+               retransmitterMgr.insert(appMsgEntry);
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
ApplicationMsgProcessor::processResponseMessage");

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 Tue Nov 28 03:08:23 2006
@@ -249,8 +249,6 @@
                SenderBean closeBean = new SenderBean();
                closeBean.setMessageContextRefKey(key);
 
-               storageManager.storeMessageContext(key, msgContext);
-
                closeBean.setTimeToSend(System.currentTimeMillis());
 
                closeBean.setMessageID(msgContext.getMessageID());
@@ -272,9 +270,9 @@
                closeBean.setSequenceID(outSequenceID);
                closeBean.setInternalSequenceID(internalSeqenceID);
 
-               retramsmitterMgr.insert(closeBean);
-
                SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+               retramsmitterMgr.insert(closeBean);
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Tue Nov 28 03:08:23 2006
@@ -394,12 +394,9 @@
                        }
 
                        ackBean.setTimeToSend(timeToSend);
-                       storageManager.storeMessageContext(key, ackMsgCtx);
 
                        
ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
                        
-                       // inserting the new ack.
-                       retransmitterBeanMgr.insert(ackBean);
                        // / asyncAckTransaction.commit();
 
                        // passing the message through sandesha2sender
@@ -407,6 +404,9 @@
                        ackRMMsgCtx = 
MsgInitializer.initializeMessage(ackMsgCtx);
                        
                        SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+                       // inserting the new ack.
+                       retransmitterBeanMgr.insert(ackBean);
 
                        
SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(), 
sequenceId);
                }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Tue Nov 28 03:08:23 2006
@@ -393,8 +393,6 @@
                SenderBean terminateBean = new SenderBean();
                terminateBean.setMessageContextRefKey(key);
 
-               storageManager.storeMessageContext(key, msgContext);
-
                // Set a retransmitter lastSentTime so that terminate will be 
send with
                // some delay.
                // Otherwise this get send before return of the current request 
(ack).
@@ -420,8 +418,6 @@
 
                SenderBeanMgr retramsmitterMgr = 
storageManager.getRetransmitterBeanMgr();
 
-               retramsmitterMgr.insert(terminateBean);
-
                SequencePropertyBean terminateAdded = new 
SequencePropertyBean();
                
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
                terminateAdded.setSequencePropertyKey(outSequenceID);
@@ -430,7 +426,9 @@
                seqPropMgr.insert(terminateAdded);
                
                SandeshaUtil.executeAndStore(rmMsgCtx, key);
-               
+       
+               retramsmitterMgr.insert(terminateBean);
+
                // Pause the message context
                rmMsgCtx.pause();
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
 Tue Nov 28 03:08:23 2006
@@ -127,8 +127,6 @@
                                
makeConnectionRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY,
                                                sequencePropertyKey);
                                
-                               
storageManager.storeMessageContext(makeConnectionMsgStoreKey,makeConnectionRMMessage.getMessageContext());
-                               
                                //add an entry for the MakeConnection message 
to the sender (with ,send=true, resend=false)
                                SenderBean makeConnectionSenderBean = new 
SenderBean ();
 //                             
makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
@@ -147,9 +145,9 @@
                                //this message should not be sent until it is 
qualified. I.e. till it is sent through the Sandesha2TransportSender.
                                
makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
                                
-                               senderBeanMgr.insert(makeConnectionSenderBean);
-                               
                                
SandeshaUtil.executeAndStore(makeConnectionRMMessage, 
makeConnectionMsgStoreKey);
+                               
+                               senderBeanMgr.insert(makeConnectionSenderBean); 
                        
                        } catch (SandeshaStorageException e) {
                                e.printStackTrace();
                        } catch (SandeshaException e) {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
 Tue Nov 28 03:08:23 2006
@@ -73,7 +73,7 @@
                
                
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_TRUE);
                
-               storageManager.updateMessageContext(key,msgContext);
+               storageManager.storeMessageContext(key,msgContext);
 
                if (log.isDebugEnabled())
                        log.debug("Exit: Sandesha2TransportSender::invoke");

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
Tue Nov 28 03:08:23 2006
@@ -1070,8 +1070,17 @@
 
                if (msgContext.isPaused())
                        engine.resumeSend(msgContext);
-               else
+               else {
+                       //this invocation has to be a blocking one.
+                       
+                       Boolean isTransportNonBlocking = (Boolean) 
msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+                       if (isTransportNonBlocking!=null && 
isTransportNonBlocking.booleanValue())
+                               
msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+                       
                        engine.send(msgContext);
+                       
+                       
msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, 
isTransportNonBlocking);
+               }
 
                if (log.isDebugEnabled())
                        log.debug("Exit: SandeshaUtil::executeAndStore");

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
 Tue Nov 28 03:08:23 2006
@@ -416,8 +416,6 @@
                SenderBean terminateBean = new SenderBean();
                terminateBean.setMessageContextRefKey(key);
 
-               storageManager.storeMessageContext(key, 
terminateRMMessage.getMessageContext());
-
                // Set a retransmitter lastSentTime so that terminate will be 
send with
                // some delay.
                // Otherwise this get send before return of the current request 
(ack).
@@ -438,10 +436,6 @@
                if (to!=null)
                        terminateBean.setToAddress(to.getAddress());
 
-               SenderBeanMgr retramsmitterMgr = 
storageManager.getRetransmitterBeanMgr();
-
-               retramsmitterMgr.insert(terminateBean);
-
                SequencePropertyBean terminateAdded = new 
SequencePropertyBean();
                
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
                terminateAdded.setSequencePropertyKey(outSequenceId);
@@ -456,6 +450,9 @@
                
                // / addTerminateSeqTransaction.commit();
                SandeshaUtil.executeAndStore(terminateRMMessage, key);
+               
+               SenderBeanMgr retramsmitterMgr = 
storageManager.getRetransmitterBeanMgr();
+               retramsmitterMgr.insert(terminateBean);
 
                if(log.isDebugEnabled())
                        log.debug("Exit: 
TerminateManager::addTerminateSequenceMessage");



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to