Author: mlovett
Date: Tue Nov 28 03:08:23 2006
New Revision: 479987
URL: http://svn.apache.org/viewvc?view=rev&rev=479987
Log:
Andy's patch to ensure each message is only stored once, instead of being
stored and then updated. See SANDESHA2-52.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
Tue Nov 28 03:08:23 2006
@@ -269,16 +269,15 @@
ackBean.setTimeToSend(timeToSend);
- storageManager.storeMessageContext(key, ackMsgCtx);
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
Sandesha2Constants.VALUE_FALSE);
- // inserting the new ack.
- retransmitterBeanMgr.insert(ackBean);
-
// passing the message through sandesha2sender
SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
+
SandeshaUtil.startSenderForTheSequence(configurationContext, sequenceId);
msgContext.pause();
@@ -386,8 +385,6 @@
SenderBean ackRequestBean = new SenderBean();
ackRequestBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
// Set a retransmitter lastSentTime so that terminate will be
send with
// some delay.
// Otherwise this get send before return of the current request
(ack).
@@ -413,9 +410,9 @@
ackRequestBean.setInternalSequenceID(internalSeqenceID);
ackRequestBean.setSequenceID(outSequenceID);
- retramsmitterMgr.insert(ackRequestBean);
-
SandeshaUtil.executeAndStore(ackRequestRMMsg, key);
+
+ retramsmitterMgr.insert(ackRequestBean);
if (log.isDebugEnabled())
log.debug("Exit:
AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Tue Nov 28 03:08:23 2006
@@ -577,12 +577,11 @@
createSeqEntry.setToAddress(to.getAddress());
createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
Sandesha2Constants.VALUE_FALSE);
-
storageManager.storeMessageContext(createSequenceMessageStoreKey,
createSeqMsg); // storing the message
+ SandeshaUtil.executeAndStore(createSeqRMMessage,
createSequenceMessageStoreKey);
+
retransmitterMgr.insert(createSeqEntry);
- SandeshaUtil.executeAndStore(createSeqRMMessage,
createSequenceMessageStoreKey);
-
if (log.isDebugEnabled())
log.debug("Exit:
ApplicationMsgProcessor::addCreateSequenceMessage");
}
@@ -752,16 +751,16 @@
appMsgEntry.setToAddress(to.getAddress());
appMsgEntry.setInternalSequenceID(internalSequenceId);
- storageManager.storeMessageContext(storageKey, msg);
msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
Sandesha2Constants.VALUE_FALSE);
- retransmitterMgr.insert(appMsgEntry);
// increasing the current handler index, so that the message
will not be
// going throught the SandeshaOutHandler again.
msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
SandeshaUtil.executeAndStore(rmMsg, storageKey);
+
+ retransmitterMgr.insert(appMsgEntry);
if (log.isDebugEnabled())
log.debug("Exit:
ApplicationMsgProcessor::processResponseMessage");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
Tue Nov 28 03:08:23 2006
@@ -249,8 +249,6 @@
SenderBean closeBean = new SenderBean();
closeBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
closeBean.setTimeToSend(System.currentTimeMillis());
closeBean.setMessageID(msgContext.getMessageID());
@@ -272,9 +270,9 @@
closeBean.setSequenceID(outSequenceID);
closeBean.setInternalSequenceID(internalSeqenceID);
- retramsmitterMgr.insert(closeBean);
-
SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+ retramsmitterMgr.insert(closeBean);
if (log.isDebugEnabled())
log.debug("Exit:
CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Tue Nov 28 03:08:23 2006
@@ -394,12 +394,9 @@
}
ackBean.setTimeToSend(timeToSend);
- storageManager.storeMessageContext(key, ackMsgCtx);
ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
Sandesha2Constants.VALUE_FALSE);
- // inserting the new ack.
- retransmitterBeanMgr.insert(ackBean);
// / asyncAckTransaction.commit();
// passing the message through sandesha2sender
@@ -407,6 +404,9 @@
ackRMMsgCtx =
MsgInitializer.initializeMessage(ackMsgCtx);
SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(),
sequenceId);
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Tue Nov 28 03:08:23 2006
@@ -393,8 +393,6 @@
SenderBean terminateBean = new SenderBean();
terminateBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
// Set a retransmitter lastSentTime so that terminate will be
send with
// some delay.
// Otherwise this get send before return of the current request
(ack).
@@ -420,8 +418,6 @@
SenderBeanMgr retramsmitterMgr =
storageManager.getRetransmitterBeanMgr();
- retramsmitterMgr.insert(terminateBean);
-
SequencePropertyBean terminateAdded = new
SequencePropertyBean();
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
terminateAdded.setSequencePropertyKey(outSequenceID);
@@ -430,7 +426,9 @@
seqPropMgr.insert(terminateAdded);
SandeshaUtil.executeAndStore(rmMsgCtx, key);
-
+
+ retramsmitterMgr.insert(terminateBean);
+
// Pause the message context
rmMsgCtx.pause();
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
Tue Nov 28 03:08:23 2006
@@ -127,8 +127,6 @@
makeConnectionRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY,
sequencePropertyKey);
-
storageManager.storeMessageContext(makeConnectionMsgStoreKey,makeConnectionRMMessage.getMessageContext());
-
//add an entry for the MakeConnection message
to the sender (with ,send=true, resend=false)
SenderBean makeConnectionSenderBean = new
SenderBean ();
//
makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
@@ -147,9 +145,9 @@
//this message should not be sent until it is
qualified. I.e. till it is sent through the Sandesha2TransportSender.
makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
Sandesha2Constants.VALUE_FALSE);
- senderBeanMgr.insert(makeConnectionSenderBean);
-
SandeshaUtil.executeAndStore(makeConnectionRMMessage,
makeConnectionMsgStoreKey);
+
+ senderBeanMgr.insert(makeConnectionSenderBean);
} catch (SandeshaStorageException e) {
e.printStackTrace();
} catch (SandeshaException e) {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
Tue Nov 28 03:08:23 2006
@@ -73,7 +73,7 @@
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_TRUE);
- storageManager.updateMessageContext(key,msgContext);
+ storageManager.storeMessageContext(key,msgContext);
if (log.isDebugEnabled())
log.debug("Exit: Sandesha2TransportSender::invoke");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Tue Nov 28 03:08:23 2006
@@ -1070,8 +1070,17 @@
if (msgContext.isPaused())
engine.resumeSend(msgContext);
- else
+ else {
+ //this invocation has to be a blocking one.
+
+ Boolean isTransportNonBlocking = (Boolean)
msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+ if (isTransportNonBlocking!=null &&
isTransportNonBlocking.booleanValue())
+
msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+
engine.send(msgContext);
+
+
msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING,
isTransportNonBlocking);
+ }
if (log.isDebugEnabled())
log.debug("Exit: SandeshaUtil::executeAndStore");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Tue Nov 28 03:08:23 2006
@@ -416,8 +416,6 @@
SenderBean terminateBean = new SenderBean();
terminateBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key,
terminateRMMessage.getMessageContext());
-
// Set a retransmitter lastSentTime so that terminate will be
send with
// some delay.
// Otherwise this get send before return of the current request
(ack).
@@ -438,10 +436,6 @@
if (to!=null)
terminateBean.setToAddress(to.getAddress());
- SenderBeanMgr retramsmitterMgr =
storageManager.getRetransmitterBeanMgr();
-
- retramsmitterMgr.insert(terminateBean);
-
SequencePropertyBean terminateAdded = new
SequencePropertyBean();
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
terminateAdded.setSequencePropertyKey(outSequenceId);
@@ -456,6 +450,9 @@
// / addTerminateSeqTransaction.commit();
SandeshaUtil.executeAndStore(terminateRMMessage, key);
+
+ SenderBeanMgr retramsmitterMgr =
storageManager.getRetransmitterBeanMgr();
+ retramsmitterMgr.insert(terminateBean);
if(log.isDebugEnabled())
log.debug("Exit:
TerminateManager::addTerminateSequenceMessage");
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]