Author: parsonsd
Date: Fri May 15 09:49:42 2009
New Revision: 775077
URL: http://svn.apache.org/viewvc?rev=775077&view=rev
Log:
- Fix newly added reallocation code as deadlock had been introduced.
- Changed RMSBean to store EndpointReference for offered Endpoints so ref
params don't get lost
- Update so that all backchannel msgs on MC's are processed successfully
- Commented out sending of AckRequested msgs as not needed as yet and impacts
performance
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=775077&r1=775076&r2=775077&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Fri May 15 09:49:42 2009
@@ -264,10 +264,7 @@
log.debug("ApplicationMsgProcessor: autoStartNewSeqForReallocation:
InternalSeqID of new sequence used for reallocation: "
+ internalSequenceId);
rmsBean.setInternalSeqIDOfSeqUsedForReallocation(internalSequenceId);
-
storageManager.getRMSBeanMgr().update(rmsBean);
-
- if(tran != null &&
tran.isActive()) tran.commit();
- tran =
storageManager.getTransaction();
+
storageManager.getRMSBeanMgr().update(rmsBean);
}
if (log.isDebugEnabled())
log.debug("ApplicationMsgProcessor: auto start new sequence " +
internalSequenceId + " :: " + rmsBean);
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=775077&r1=775076&r2=775077&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
Fri May 15 09:49:42 2009
@@ -211,8 +211,9 @@
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning,
createSeqRMMsg.getMessageContext().getMessageID(),
offeredSequenceID));
}
- rMSBean = new
RMSBean(); //Set the
offered EP
-
rMSBean.setOfferedEndPoint(endpointAddress);
+ rMSBean = new RMSBean();
+ //Set the offered EP
+
rMSBean.setOfferedEndPointEPR(endpoint.getEPR());
} else {
//Don't accept the offer
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=775077&r1=775076&r2=775077&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
Fri May 15 09:49:42 2009
@@ -19,6 +19,7 @@
package org.apache.sandesha2.storage.beans;
+import org.apache.axis2.addressing.EndpointReference;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.util.Range;
import org.apache.sandesha2.util.RangeString;
@@ -82,6 +83,7 @@
private String transportTo;
private String offeredEndPoint = null;
+ private EndpointReference offeredEndPointEPR = null;
private String offeredSequence = null;
@@ -204,6 +206,7 @@
lastSendErrorTimestamp =
beanToCopy.getLastSendErrorTimestamp();
nextMessageNumber = beanToCopy.getNextMessageNumber();
offeredEndPoint = beanToCopy.getOfferedEndPoint();
+ offeredEndPointEPR = beanToCopy.getOfferedEndPointEPR();
offeredSequence = beanToCopy.getOfferedSequence();
referenceMessageStoreKey =
beanToCopy.getReferenceMessageStoreKey();
sequenceClosedClient = beanToCopy.isSequenceClosedClient();
@@ -444,7 +447,11 @@
result.append("\nClosedClient : ");
result.append(sequenceClosedClient);
result.append("\nExpectedReplies : ");
result.append(expectedReplies);
result.append("\nTransportTo : ");
result.append(transportTo);
- result.append("\nOfferedEndPoint : ");
result.append(offeredEndPoint);
+ if(offeredEndPointEPR != null){
+ result.append("\nOfferedEndPoint : ");
result.append(offeredEndPointEPR.getAddress());
+ } else {
+ result.append("\nOfferedEndPoint : null");
+ }
result.append("\nOfferedSequence : ");
result.append(offeredSequence);
if (lastSendErrorTimestamp > 0) {
result.append("\nLastError : ");
result.append(lastSendError);
@@ -490,6 +497,9 @@
else if(bean.getTransportTo() != null &&
!bean.getTransportTo().equals(this.getTransportTo()))
match = false;
+ else if(bean.getOfferedEndPointEPR() != null &&
!bean.getOfferedEndPointEPR().getAddress().equals(this.getOfferedEndPointEPR().getAddress()))
+ match = false;
+
else if(bean.getOfferedEndPoint() != null &&
!bean.getOfferedEndPoint().equals(this.getOfferedEndPoint()))
match = false;
@@ -557,4 +567,13 @@
this.internalSeqIDOfSeqUsedForReallocation =
internalSeqIDOfSeqUsedForReallocation;
}
+ public EndpointReference getOfferedEndPointEPR() {
+ return offeredEndPointEPR;
+ }
+
+ public void setOfferedEndPointEPR(EndpointReference offeredEndPointEPR)
{
+ this.offeredEndPointEPR = offeredEndPointEPR;
+ this.offeredEndPoint = offeredEndPointEPR.getAddress();
+ }
+
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=775077&r1=775076&r2=775077&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
Fri May 15 09:49:42 2009
@@ -411,12 +411,14 @@
//If not get it from the To property.
EndpointReference toEPR = null;
-
- if (rmsBean.getOfferedEndPoint() != null)
+
+ if(rmsBean.getOfferedEndPointEPR() != null){
+ toEPR = rmsBean.getOfferedEndPointEPR();
+ } else if(rmsBean.getOfferedEndPoint() != null){
toEPR = new EndpointReference
(rmsBean.getOfferedEndPoint());
+ }
if (toEPR==null) {
-
if (rmsBean.getToEndpointReference()!=null) {
toEPR = rmsBean.getToEndpointReference();
if (toEPR == null) {
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=775077&r1=775076&r2=775077&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Fri May 15 09:49:42 2009
@@ -232,6 +232,12 @@
transaction = storageManager.getTransaction();
+ /*Removing the sending of AckRequested msgs as has
performance impact.
+ It could be rewritten to send AckRequested
headers infrequantly and
+ hence be less of a performance impact.
Functionally it
+ may be required to interop with other
implementations but until
+ the problem occurs it's best not to do it at
all and keep performance
+ as optimal as possible
//If this is an application msg we need to add an
ackRequest to the header
if(messageType ==
Sandesha2Constants.MessageTypes.APPLICATION){
//Add an ackRequest
@@ -241,7 +247,7 @@
transaction.commit();
transaction = storageManager.getTransaction();
- }
+ } */
//if this is a sync RM exchange protocol we always have
to add an ack
boolean ackPresent = false;
@@ -315,7 +321,7 @@
try {
InvocationResponse response =
InvocationResponse.CONTINUE;
- if(storageManager.requiresMessageSerialization()) {
+
if(storageManager.requiresMessageSerialization()) {
if(msgCtx.isPaused()) {
if (log.isDebugEnabled())
log.debug("Resuming a
send for message : " + msgCtx.getEnvelope().getHeader());
@@ -639,10 +645,12 @@
int responseMessageType =
responseRMMessage.getMessageType();
if(log.isDebugEnabled())
log.debug("inboundMsgType" + responseMessageType + "outgoing message type " +
messageType);
- //if this is an application response or
createSeqResponse msg in response to a make connection then we have to take
care with the service context
- if(messageType ==
Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG
+ //if this is a response msg in response to a
make connection then we have to take care with the service context
+ if((messageType ==
Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG || messageType ==
Sandesha2Constants.MessageTypes.UNKNOWN)
&& (responseMessageType ==
Sandesha2Constants.MessageTypes.APPLICATION
- ||
responseMessageType == Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE)){
+ ||
responseMessageType == Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE
+ ||
responseMessageType == Sandesha2Constants.MessageTypes.TERMINATE_SEQ_RESPONSE
+ ||
responseMessageType ==
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE_RESPONSE)){
//Setting the AxisService object
responseMessageContext.setAxisService(msgCtx.getAxisService());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]