Author: parsonsd
Date: Fri Apr 3 18:27:57 2009
New Revision: 761754
URL: http://svn.apache.org/viewvc?rev=761754&view=rev
Log:
Fix to allow automatic reallocation of sequences that have timed out or been
deleted. The solution is to have a reallocated RMSBean point at the RMSBean
created as part of the reallocation via a new RMSBean attribute that contains
the internalSeqID of the newly created RMSBean.
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
Fri Apr 3 18:27:57 2009
@@ -245,6 +245,23 @@
String ENDPOINT = "Endpoint";
String UNSUPPORTED_ELEMENT = "UnsupportedElement";
+
+ //This is to identify an RMSBean that hasn't been reallocated
+ int NOT_REALLOCATED = 0;
+
+ //This is to identify an RMSBean that is to be reallocated or
has been reallocated
+ int REALLOCATED = 1;
+
+ //This is to identify an RMSBean that was created for
reallocation but then was reallocated itself
+ //That way we know it can be deleted
+ int ORIGINAL_REALLOCATED_BEAN_COMPLETE = 2;
+
+ //This is to identify the RMS Bean that was created to
reallocate another RMSBean
+ int RMS_BEAN_USED_FOR_REALLOCATION = 3;
+
+ //This is to identify an RMSBean that was attempted to be
reallocated but for some reason the reallocation failed.
+ int REALLOCATION_FAILED = -1;
+
}
public interface WSA {
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
Fri Apr 3 18:27:57 2009
@@ -443,8 +443,19 @@
if (terminatedSequence) {
// Delete the rmsBean
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+
+ if(tran != null && tran.isActive())
tran.commit();
+ tran = storageManager.getTransaction();
+
+ //Need to check if it's an RMSBean created for
reallocation. If so we need to
+ //delete the original RMSBean that was
reallocated.
+ RMSBean reallocatedRMSBean =
SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager,
rmsBean.getInternalSequenceID());
+ if(reallocatedRMSBean != null){
+ if (log.isDebugEnabled())
+ log.debug("Removing Reallocated
RMSBean " + reallocatedRMSBean);
+
storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID());
+ }
}
-
if(tran != null && tran.isActive()) tran.commit();
tran = null;
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
Fri Apr 3 18:27:57 2009
@@ -79,8 +79,8 @@
public static final String propertyInvalidValue="propertyInvalidValue";
public static final String invalidRange="invalidRange";
public static final String workAlreadyAssigned="workAlreadyAssigned";
- public static final String reallocationFailed="reallocationFailed";
-
+ public static final String reallocationFailed="reallocationFailed";
+ public static final String
reallocationForSyncRequestReplyNotSupported="reallocationForSyncRequestReplyNotSupported";
public static final String
rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";
public static final String unknownWSAVersion="unknownWSAVersion";
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Fri Apr 3 18:27:57 2009
@@ -177,8 +177,6 @@
if (msgContext.getMessageID() == null)
msgContext.setMessageID(SandeshaUtil.getUUID());
-
-
/*
* Internal sequence id is the one used to refer to the
sequence (since
* actual sequence id is not available when first msg arrives)
server
@@ -230,13 +228,47 @@
RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
+ boolean autoStartNewSeqForReallocation = false;
//if this is an existing sequence then we need to do some
checks first
if(rmsBean != null)
{
+ //If the sequence has been reallocated we need to find
out the new internalSeqID.
+ //If the internalSeqID hasn't been set yet we should
auto restart. If it has a new
+ //internalSeqID we just send the message on the new
reallocated sequence.
+ int seqReallocated = rmsBean.isReallocated();
+ if(seqReallocated ==
Sandesha2Constants.WSRM_COMMON.REALLOCATED){
+ if (log.isDebugEnabled())
+ log.debug("ApplicationMsgProcessor:
Reallocated Sequence: " + rmsBean.getSequenceID());
+ //Try and get the new internalSeqID
+ internalSequenceId =
rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
+ if(internalSequenceId != null){
+ if (log.isDebugEnabled())
+
log.debug("ApplicationMsgProcessor: InternalSeqID of new sequence: " +
internalSequenceId);
+
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
internalSequenceId);
+ rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
+ } else {
+ autoStartNewSeqForReallocation = true;
+ }
+ } else if(seqReallocated ==
Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED){
+ //We can't do anymore as we have already tried
to reallocate this sequence.
+ throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed,
rmsBean.getSequenceID(),
+ "We have already attempted to
reallocate this Sequence and we won't try again. The sequance needs to be
cleaned up manually."));
+ }
+
//see if the sequence is closed
- if(rmsBean.isSequenceClosedClient() ||
rmsBean.isTerminateAdded() || rmsBean.isTimedOut()){
+ if(rmsBean.isSequenceClosedClient() ||
rmsBean.isTerminateAdded() || rmsBean.isTimedOut() ||
autoStartNewSeqForReallocation){
if(SandeshaUtil.isAutoStartNewSequence(msgContext)){
internalSequenceId =
getSequenceID(rmMsgCtx, serverSide, true); //require a new sequence
+ if(autoStartNewSeqForReallocation){
+ if (log.isDebugEnabled())
+
log.debug("ApplicationMsgProcessor: autoStartNewSeqForReallocation:
InternalSeqID of new sequence used for reallocation: "
+
+ internalSequenceId);
+
rmsBean.setInternalSeqIDOfSeqUsedForReallocation(internalSequenceId);
+
storageManager.getRMSBeanMgr().update(rmsBean);
+
+ if(tran != null &&
tran.isActive()) tran.commit();
+ tran =
storageManager.getTransaction();
+ }
if (log.isDebugEnabled())
log.debug("ApplicationMsgProcessor: auto start new sequence " +
internalSequenceId + " :: " + rmsBean);
//set this new internal sequence ID on
the msg
@@ -337,6 +369,11 @@
if (rmsBean == null) {
rmsBean =
SequenceManager.setupNewClientSequence(msgContext, internalSequenceId,
storageManager);
rmsBean =
addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+
+
if(autoStartNewSeqForReallocation){
+
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.RMS_BEAN_USED_FOR_REALLOCATION);
+ }
+
if(rmsBean != null)
outSequenceID = rmsBean.getSequenceID();
if (rmsBean == null &&
appMsgProcTran != null && appMsgProcTran.isActive()) {
@@ -348,7 +385,6 @@
appMsgProcTran =
storageManager.getTransaction();
}
}
-
}
} else {
@@ -554,6 +590,7 @@
if (log.isDebugEnabled())
log.debug("Exit:
ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
+
return true;
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Fri Apr 3 18:27:57 2009
@@ -154,7 +154,7 @@
if(!rmsBeanMgr.update(rmsBean)){
//Im not setting the createSeqBean sender bean to
resend true as the reallocation of msgs will do this
try{
- TerminateManager.terminateSendingSide(rmsBean,
storageManager, true);
+ TerminateManager.terminateSendingSide(rmsBean,
storageManager, true, transaction);
} catch(Exception e){
if (log.isDebugEnabled())
log.debug(e);
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
Fri Apr 3 18:27:57 2009
@@ -74,7 +74,7 @@
}
}
- TerminateManager.terminateSendingSide (rmsBean, storageManager,
false);
+ TerminateManager.terminateSendingSide (rmsBean, storageManager,
false, null);
// Stop this message travelling further through the Axis runtime
terminateResRMMsg.pause();
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
Fri Apr 3 18:27:57 2009
@@ -19,6 +19,7 @@
package org.apache.sandesha2.storage.beans;
+import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.util.Range;
import org.apache.sandesha2.util.RangeString;
@@ -152,6 +153,22 @@
* be ignored within the match method.
*/
private int rmsFlags = 0;
+
+ /**
+ * Indicates the reallocation state. The states can be either:
+ * notReallocated - The bean hasn't been reallocated
+ * reallocated - The bean is to be reallocated
+ * ReallocatedBeanComplete - The bean was created for reallocation but
is no longer needed as itself has been reallocated
+ * BeanUsedForReallocation - The bean was created for reallocation
+ * ReallocationFailed - The reallocation of this bean failed
+ */
+ private int reallocated =
Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED;
+
+ /**
+ * Contains the internalSeqID of the seq that has sent the reallocated
msgs
+ */
+ private String internalSeqIDOfSeqUsedForReallocation = null;
+
public static final int LAST_SEND_ERROR_TIME_FLAG = 0x00000001;
public static final int LAST_OUT_MSG_FLAG = 0x00000010;
public static final int HIGHEST_OUT_MSG_FLAG = 0x00000100;
@@ -195,7 +212,9 @@
terminationPauserForCS = beanToCopy.isTerminationPauserForCS();
timedOut = beanToCopy.isTimedOut();
transportTo = beanToCopy.getTransportTo();
- avoidAutoTermination = beanToCopy.isAvoidAutoTermination();
+ avoidAutoTermination = beanToCopy.isAvoidAutoTermination();
+ reallocated = beanToCopy.isReallocated();
+ internalSeqIDOfSeqUsedForReallocation =
beanToCopy.getInternalSeqIDOfSeqUsedForReallocation();
}
public String getCreateSeqMsgID() {
@@ -434,6 +453,8 @@
result.append("\nClientCompletedMsgs: ");
result.append(clientCompletedMessages);
result.append("\nAnonymous UUID : ");
result.append(anonymousUUID);
result.append("\nSOAPVersion : "); result.append(soapVersion);
+ result.append("\nReallocated : "); result.append(reallocated);
+ result.append("\nInternalSeqIDOfSeqUsedForReallocation : ");
result.append(internalSeqIDOfSeqUsedForReallocation);
return result.toString();
}
@@ -478,6 +499,9 @@
else if(bean.getAnonymousUUID() != null &&
!bean.getAnonymousUUID().equals(this.getAnonymousUUID()))
match = false;
+ else if((bean.getInternalSeqIDOfSeqUsedForReallocation() !=
null &&
!bean.getInternalSeqIDOfSeqUsedForReallocation().equals(this.getInternalSeqIDOfSeqUsedForReallocation())))
+ match = false;
+
// Avoid matching on the error information
// else if((bean.rmsFlags & LAST_SEND_ERROR_TIME_FLAG) != 0 &&
bean.getLastSendErrorTimestamp() != this.getLastSendErrorTimestamp())
// match = false;
@@ -511,8 +535,26 @@
else if((bean.rmsFlags & EXPECTED_REPLIES) != 0 &&
bean.getExpectedReplies() != this.getExpectedReplies())
match = false;
+
+
return match;
}
+ public int isReallocated() {
+ return reallocated;
+ }
+
+ public void setReallocated(int reallocated) {
+ this.reallocated = reallocated;
+ }
+
+ public String getInternalSeqIDOfSeqUsedForReallocation() {
+ return internalSeqIDOfSeqUsedForReallocation;
+ }
+
+ public void setInternalSeqIDOfSeqUsedForReallocation(String
internalSeqIDOfSeqUsedForReallocation) {
+ this.internalSeqIDOfSeqUsedForReallocation =
internalSeqIDOfSeqUsedForReallocation;
+ }
+
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
Fri Apr 3 18:27:57 2009
@@ -619,7 +619,7 @@
if (log.isDebugEnabled())
log.debug("Sending fault
message " + faultMessageContext.getEnvelope().getHeader());
- // Sending the message
+ //Sending the message
//having a surrounded try block will
make sure that the error is logged here
//and that this does not disturb the
processing of a carrier message.
try {
@@ -671,7 +671,7 @@
}
- private static InvocationResponse manageIncomingFault (AxisFault fault,
RMMsgContext rmMsgCtx, SOAPFault faultPart) throws AxisFault {
+ private static InvocationResponse manageIncomingFault (AxisFault fault,
RMMsgContext rmMsgCtx, SOAPFault faultPart, Transaction transaction) throws
AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::manageIncomingFault");
InvocationResponse response = InvocationResponse.CONTINUE;
@@ -743,7 +743,7 @@
} else if
(Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode)
||
Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode)
||
Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equals(soapFaultSubcode))
{
- processSequenceUnknownFault(rmMsgCtx, fault,
identifier);
+ processSequenceUnknownFault(rmMsgCtx, fault,
identifier, transaction);
}
// If the operation is an Sandesha In Only operation, or the
fault is a recognised fault,
@@ -783,7 +783,7 @@
// constructing the fault
AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart,
rmMsgCtx);
- response = manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+ response = manageIncomingFault (axisFault, rmMsgCtx, faultPart,
transaction);
if(transaction != null && transaction.isActive())
transaction.commit();
transaction = null;
@@ -966,7 +966,7 @@
// Cleanup sending side.
if (log.isDebugEnabled())
log.debug("Terminating sending sequence " + rmsBean);
- TerminateManager.terminateSendingSide(rmsBean, storageManager,
false);
+ TerminateManager.terminateSendingSide(rmsBean, storageManager,
false, null);
if (log.isDebugEnabled())
log.debug("Exit:
FaultManager::processCreateSequenceRefusedFault");
@@ -980,7 +980,7 @@
* @param fault
* @param identifier
*/
- private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx,
AxisFault fault, String sequenceID) throws AxisFault {
+ private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx,
AxisFault fault, String sequenceID, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
FaultManager::processSequenceUnknownFault " + sequenceID);
@@ -998,16 +998,16 @@
// Cleanup sending side.
if (log.isDebugEnabled())
log.debug("Terminating sending sequence " +
rmsBean);
- if(!TerminateManager.terminateSendingSide(rmsBean,
storageManager, true)){
+ if(!TerminateManager.terminateSendingSide(rmsBean,
storageManager, true, transaction)){
// We did not reallocate so we notify the
clients of a failure
notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager,
configCtx, fault);
+
+ //Mark the RMSBean as reallocation failed and
update last activation time
+ transaction = storageManager.getTransaction();
+
rmsBean.setLastActivatedTime(System.currentTimeMillis());
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
}
-
- // Update the last activated time.
-
rmsBean.setLastActivatedTime(System.currentTimeMillis());
-
- // Update the bean in the map
- storageManager.getRMSBeanMgr().update(rmsBean);
}
else {
RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
Fri Apr 3 18:27:57 2009
@@ -44,6 +44,7 @@
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.client.async.Callback;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
@@ -59,6 +60,8 @@
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.engine.Handler;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -74,6 +77,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
@@ -1015,10 +1019,22 @@
return targetEnv;
}
-
- public static void reallocateMessagesToNewSequence(StorageManager
storageManager, RMSBean oldRMSBean, List<MessageContext> msgsToSend)throws
AxisFault{
- if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
- log.debug("Enter:
SandeshaUtil::reallocateMessagesToNewSequence");
+
+
+ /**
+ * ReallocateMessages to a new sequence
+ * @param storageManager
+ * @param oldRMSBean
+ * @param msgsToSend
+ * @param transaction
+ *
+ */
+ public static void reallocateMessagesToNewSequence(StorageManager
storageManager, RMSBean oldRMSBean,
+ List<MessageContext> msgsToSend, Transaction transaction)
+ throws AxisFault, SandeshaException{
+
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("Enter:
SandeshaUtil::reallocateMessagesToNewSequence");
ConfigurationContext ctx = storageManager.getContext();
ServiceClient client = new ServiceClient(ctx, null);
@@ -1027,30 +1043,68 @@
Options options = client.getOptions();
options.setTo(oldRMSBean.getToEndpointReference());
options.setReplyTo(oldRMSBean.getReplyToEndpointReference());
-
- //internal sequence ID is different
- String internalSequenceID = oldRMSBean.getInternalSequenceID();
- //we also need to obtain the sequenceKey from the internalSequenceID.
- String oldSequenceKey =
-
SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID,
oldRMSBean.getToEndpointReference().getAddress());
- //remove the old sequence key from the internal sequence ID
- internalSequenceID = internalSequenceID.substring(0,
internalSequenceID.length()-oldSequenceKey.length());
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,
- SandeshaUtil.getUUID()); //using a new sequence Key to
differentiate from the old sequence
-
options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
internalSequenceID);
- options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,
oldRMSBean.getRMVersion());
-
options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES,
Boolean.FALSE);
-
- //send the msgs - this will setup a new sequence to the same endpoint
- Iterator<MessageContext> it = msgsToSend.iterator();
- while(it.hasNext()){
- MessageContext msgCtx = (MessageContext)it.next();
- client.getOptions().setAction(msgCtx.getWSAAction());
-
client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
- }
-
- if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
- log.debug("Exit:
SandeshaUtil::reallocateMessagesToNewSequence");
+
+ //internal sequence ID is different
+ String internalSequenceID = oldRMSBean.getInternalSequenceID();
+
+
options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
internalSequenceID);
+ options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,
oldRMSBean.getRMVersion());
+
options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES,
Boolean.FALSE);
+
+ //Update the RMSBean so as to mark it as reallocated if it
isn't an RMSbean created for a previous reallocation
+ RMSBean originallyReallocatedRMSBean =
SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager,
oldRMSBean.getInternalSequenceID());
+ if(originallyReallocatedRMSBean == null){
+
oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATED);
+ storageManager.getRMSBeanMgr().update(oldRMSBean);
+ } else {
+
options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
originallyReallocatedRMSBean.getInternalSequenceID());
+
originallyReallocatedRMSBean.setInternalSeqIDOfSeqUsedForReallocation(null);
+
storageManager.getRMSBeanMgr().update(originallyReallocatedRMSBean);
+
+ //Setting this property so that the bean can be deleted
+
oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE);
+
oldRMSBean.setInternalSeqIDOfSeqUsedForReallocation(originallyReallocatedRMSBean.getInternalSequenceID());
+ storageManager.getRMSBeanMgr().update(oldRMSBean);
+ }
+
+ //Commit current transaction that wraps the manageFaultMsg as
we are about to start
+ //resending msgs on a new seq and they will need to get a
transaction on the
+ //current thread
+ if(transaction != null && transaction.isActive())
transaction.commit();
+
+ //send the msgs - this will setup a new sequence to the same
endpoint
+ Iterator<MessageContext> it = msgsToSend.iterator();
+
+ while(it.hasNext()){
+ MessageContext msgCtx = (MessageContext)it.next();
+
+ //Set the action
+ client.getOptions().setAction(msgCtx.getWSAAction());
+
+ //Set the message ID
+ client.getOptions().setMessageId(msgCtx.getMessageID());
+
+ //Get the AxisOperation
+ AxisOperation axisOperation = msgCtx.getAxisOperation();
+
+ //If it's oneway or async, reallocate
+ //Fail if replyTo is annonymous as this is currently
not supported because in twoway we can't get responses back to th eold something
+ if(axisOperation.getAxisSpecificMEPConstant() ==
WSDLConstants.MEP_CONSTANT_OUT_ONLY){
+
client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
+ } else if
(client.getOptions().getReplyTo().hasAnonymousAddress()){
+
oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+
storageManager.getRMSBeanMgr().update(oldRMSBean);
+ throw new
SandeshaException(SandeshaMessageKeys.reallocationForSyncRequestReplyNotSupported);
+ } else {
+ MessageReceiver msgReceiver =
axisOperation.getMessageReceiver();
+ Object callback =
((CallbackReceiver)msgReceiver).lookupCallback(msgCtx.getMessageID());
+ client.setAxisService(msgCtx.getAxisService());
+
client.sendReceiveNonBlocking(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement(),
(Callback)callback);
+ }
+ }
+
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("Exit:
SandeshaUtil::reallocateMessagesToNewSequence");
}
/**
@@ -1276,4 +1330,16 @@
return result;
}
+ public static RMSBean isLinkedToReallocatedRMSBean(StorageManager
storageManager, String internalSeqID) throws SandeshaException {
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("Enter:
SandeshaUtil::isLinkedToReallocatedRMSBean");
+
+ //Need to check if it's an RMSBean created for reallocation.
+ RMSBean finderBean = new RMSBean();
+
finderBean.setInternalSeqIDOfSeqUsedForReallocation(internalSeqID);
+ RMSBean reallocatedRMSBean =
storageManager.getRMSBeanMgr().findUnique(finderBean);
+
+ if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled()) log.debug("Enter:
SandeshaUtil::isLinkedToReallocatedRMSBean, ReallocatedRMSBean: " +
reallocatedRMSBean);
+ return reallocatedRMSBean;
+ }
+
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
Fri Apr 3 18:27:57 2009
@@ -25,8 +25,11 @@
import java.util.LinkedList;
import java.util.List;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -40,12 +43,13 @@
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.InvokerBean;
import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
/**
@@ -231,14 +235,14 @@
* @return true if the reallocation happened sucessfully
*/
public static boolean terminateSendingSide(RMSBean rmsBean,
- StorageManager storageManager, boolean reallocate)
throws SandeshaException {
+ StorageManager storageManager, boolean reallocate,
Transaction transaction) throws SandeshaException {
// Indicate that the sequence is terminated
rmsBean.setTerminated(true);
rmsBean.setTerminateAdded(true);
storageManager.getRMSBeanMgr().update(rmsBean);
- return cleanSendingSideData (rmsBean.getInternalSequenceID(),
storageManager, rmsBean, reallocate);
+ return cleanSendingSideData (rmsBean.getInternalSequenceID(),
storageManager, rmsBean, reallocate, transaction);
}
public static void timeOutSendingSideSequence(String internalSequenceId,
@@ -249,11 +253,11 @@
rmsBean.setLastActivatedTime(System.currentTimeMillis());
storageManager.getRMSBeanMgr().update(rmsBean);
- cleanSendingSideData(internalSequenceId, storageManager,
rmsBean, false);
+ cleanSendingSideData(internalSequenceId, storageManager,
rmsBean, false, null);
}
private static boolean cleanSendingSideData(String internalSequenceId,
StorageManager storageManager,
- RMSBean rmsBean, boolean reallocateIfPossible) throws
SandeshaException {
+ RMSBean rmsBean, boolean reallocateIfPossible,
Transaction transaction) throws SandeshaException {
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter:
TerminateManager::cleanSendingSideData " + internalSequenceId + ", " +
reallocateIfPossible);
@@ -274,12 +278,15 @@
if(ranges.length==1){
//the sequence is a single contiguous acked range
lastAckedMsg = ranges[0].upperValue;
- }
- else{
- //cannot reallocate as there are gaps
- reallocateIfPossible=false;
- if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
- log.debug("cannot reallocate sequence as there
are gaps");
+ } else{
+ if(reallocateIfPossible){
+ //cannot reallocate as there are gaps
+
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ reallocateIfPossible=false;
+ if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
+ log.debug("cannot reallocate sequence
as there are gaps");
+ }
}
while (iterator.hasNext()) {
@@ -332,14 +339,48 @@
if(reallocateIfPossible){
try{
-
SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean,
msgsToReallocate);
- reallocatedOK = true;
- }
- catch(Exception e){
- //want that the reallocation failed
+
SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean,
msgsToReallocate, transaction);
+ reallocatedOK = true;
+
+ //If the reallocation was successful and the
RMSBean being reallocated was originally created for reallocation
+ //the RMSBean can be deleted.
+ transaction = storageManager.getTransaction();
+ if(rmsBean.isReallocated() ==
Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE){
+
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED);
+
storageManager.getRMSBeanMgr().update(rmsBean);
+ }
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+ } catch(Exception e){
+
if(LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed,
rmsBean.getSequenceID(), e.toString()));
- }
+
+ //Reallocation Failed
+ //Need to mark any RMSBeans involved as failed
so that we don't attempt to send
+ //anymore messages on these seq's. The client
will have to manually reallocate and
+ //administer the sequences.
+ transaction = storageManager.getTransaction();
+
+
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ String intSeqIDOfOriginallyReallocatedSeq =
rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
+ if(intSeqIDOfOriginallyReallocatedSeq != null){
+ RMSBean origRMSBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
intSeqIDOfOriginallyReallocatedSeq);
+
origRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+
storageManager.getRMSBeanMgr().update(origRMSBean);
+ }
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
+ } finally {
+ if (transaction != null &&
transaction.isActive()) {
+ transaction.rollback();
+ }
+ }
}
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
Fri Apr 3 18:27:57 2009
@@ -427,7 +427,7 @@
private void deleteRMSBeans(List<RMSBean> rmsBeans, SandeshaPolicyBean
propertyBean, long deleteTime)
- throws SandeshaStorageException {
+ throws SandeshaStorageException, SandeshaException {
if (log.isDebugEnabled())
log.debug("Enter: Sender::deleteRMSBeans");
@@ -437,12 +437,24 @@
RMSBean rmsBean = (RMSBean) beans.next();
long timeNow = System.currentTimeMillis();
long lastActivated = rmsBean.getLastActivatedTime();
+
// delete sequences that have been timedout or deleted
for more than
// the SequenceRemovalTimeoutInterval
-
- if ((lastActivated + deleteTime) < timeNow) {
+ if (((lastActivated + deleteTime) < timeNow) &&
+ (rmsBean.isReallocated() ==
Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED)) {
if (log.isDebugEnabled())
log.debug("Removing RMSBean " +
rmsBean);
+
+ //Need to check if it's an RMSBean created for
reallocation. If so we need to
+ //delete the original RMSBean that was
reallocated.
+ RMSBean reallocatedRMSBean =
SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager,
rmsBean.getInternalSequenceID());
+
+ if(reallocatedRMSBean != null){
+ if (log.isDebugEnabled())
+ log.debug("Removing Reallocated
RMSBean " + reallocatedRMSBean);
+
storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID());
+ }
+
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
storageManager.removeMessageContext(rmsBean.getReferenceMessageStoreKey());
}
@@ -616,7 +628,7 @@
// Mark the sequence as terminated
RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromSequenceId(manager, id);
-
TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+
TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
if(log.isDebugEnabled())
log.debug("Sender::checkForOrphanMessages. Orphaned message of type
TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found. Deleting this message with a
sequence ID of : " + id);
// Delete the terminate sender bean.
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Fri Apr 3 18:27:57 2009
@@ -418,7 +418,7 @@
String sequenceID =
terminateSequence.getIdentifier().getIdentifier();
RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
-
TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+
TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
if(transaction != null &&
transaction.isActive()) transaction.commit();
transaction = null;
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
Fri Apr 3 18:27:57 2009
@@ -82,7 +82,8 @@
msgContextNotSet=Sandesha2 Internal Error: ''MessageContext'' is null.
transportOutNotPresent=Sandesha2 Internal Error: original transport sender is
not present.
workAlreadyAssigned=Work ''{0}'' is already assigned to a different Worker.
Will try the next one.
-reallocationFailed=The sequence ''{0}'' could not be reallocated due to the
error ''{1}''.
+reallocationFailed=Reallocation of msgs from sequence ''{0}'' failed, ''{1}''.
+reallocationForSyncRequestReplyNotSupported=Reallocation for sync requestReply
not supported.
couldNotFindOperation=Could not find operation for message type {0} and spec
level {1}.
cannotChooseAcksTo=Could not find an appropriate acksTo for the reply
sequence, given inbound sequence {0} and bean info {1}.
cannotChooseSpecLevel=Could not find an appropriate specification level for
the reply sequence, given inbound sequence {0} and bean info {1}.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]