Author: gatfora
Date: Mon Jan 8 00:42:07 2007
New Revision: 493983
URL: http://svn.apache.org/viewvc?view=rev&rev=493983
Log:
Move the lastOutMessage to the RMSBean
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=493983&r1=493982&r2=493983
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
Mon Jan 8 00:42:07 2007
@@ -309,10 +309,6 @@
// for an outbound sequence.
String HIGHEST_OUT_MSG_NUMBER = "HighestOutMsgNumber";
String HIGHEST_OUT_RELATES_TO = "HighestOutRelatesTo";
-
- // Once the last message for any outbound sequence has been
acknowledged,
- // we can safely close the sequence.
- String LAST_OUT_MESSAGE_NO = "LastOutMessage";
String SECURITY_TOKEN = "SecurityToken";
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=493983&r1=493982&r2=493983
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Mon Jan 8 00:42:07 2007
@@ -43,6 +43,7 @@
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.AcknowledgementManager;
@@ -226,27 +227,22 @@
seqPropMgr.update(allCompletedMsgsBean);
- String lastOutMsgNoStr =
SandeshaUtil.getSequenceProperty(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO, storageManager);
- if (lastOutMsgNoStr != null) {
- long highestOutMsgNo = 0;
- if (lastOutMsgNoStr != null) {
- highestOutMsgNo =
Long.parseLong(lastOutMsgNoStr);
- }
+ RMSBean bean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
+
+ long highestOutMsgNo = bean.getLastOutMessage();
- if (highestOutMsgNo > 0) {
- boolean complete =
AcknowledgementManager.verifySequenceCompletion(sequenceAck
-
.getAcknowledgementRanges().iterator(), highestOutMsgNo);
+ if (highestOutMsgNo > 0) {
+ boolean complete =
AcknowledgementManager.verifySequenceCompletion(sequenceAck
+ .getAcknowledgementRanges().iterator(),
highestOutMsgNo);
- if (complete) {
+ if (complete) {
- //using create sequence message as the
reference message.
+ //using create sequence message as the
reference message.
// RMSBeanMgr createSeqBeanMgr =
storageManager.getCreateSeqBeanMgr();
// RMSBean createSeqBean =
createSeqBeanMgr.retrieve(msgId);
//
-
TerminateManager.addTerminateSequenceMessage(rmMsgCtx, sequencePropertyKey,
outSequenceId, sequencePropertyKey,
- storageManager);
- }
+
TerminateManager.addTerminateSequenceMessage(rmMsgCtx, sequencePropertyKey,
outSequenceId, sequencePropertyKey,
+ storageManager);
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=493983&r1=493982&r2=493983
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Mon Jan 8 00:42:07 2007
@@ -252,13 +252,6 @@
seqPropMgr.insert(responseRelatesToBean);
}
- if (lastMessage) {
- SequencePropertyBean responseLastMsgKeyBean = new
SequencePropertyBean(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO, new
Long(messageNumber).toString());
-
- seqPropMgr.insert(responseLastMsgKeyBean);
- }
-
boolean sendCreateSequence = false;
String outSequenceID =
SandeshaUtil.getSequenceIDFromInternalSequenceID(internalSequenceId,
storageManager);
@@ -340,6 +333,8 @@
ServiceContext serviceContext = msgContext.getServiceContext();
OperationContext operationContext =
msgContext.getOperationContext();
+ RMSBean rmsBean = null;
+
// SENDING THE CREATE SEQUENCE.
if (sendCreateSequence) {
EndpointReference acksToEPR = null;
@@ -414,9 +409,22 @@
throw new SandeshaException (e);
}
}
- addCreateSequenceMessage(rmMsgCtx,
sequencePropertyKey ,internalSequenceId, acksToEPR, storageManager);
+ rmsBean = addCreateSequenceMessage(rmMsgCtx,
sequencePropertyKey ,internalSequenceId, acksToEPR, storageManager);
}
}
+
+ if (rmsBean == null && lastMessage) {
+ RMSBean findBean = new RMSBean();
+ findBean.setInternalSequenceID(internalSequenceId);
+ rmsBean =
storageManager.getRMSBeanMgr().findUnique(findBean);
+ }
+
+ if (lastMessage) {
+ rmsBean.setLastOutMessage(messageNumber);
+ // Update the rmsBean
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ }
+
SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
if (env == null) {
@@ -468,7 +476,7 @@
return true;
}
- private void addCreateSequenceMessage(RMMsgContext applicationRMMsg,
String sequencePropertyKey, String internalSequenceId, EndpointReference acksTo,
+ private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg,
String sequencePropertyKey, String internalSequenceId, EndpointReference acksTo,
StorageManager storageManager) throws AxisFault {
if (log.isDebugEnabled())
@@ -571,7 +579,8 @@
retransmitterMgr.insert(createSeqEntry);
if (log.isDebugEnabled())
- log.debug("Exit:
ApplicationMsgProcessor::addCreateSequenceMessage");
+ log.debug("Exit:
ApplicationMsgProcessor::addCreateSequenceMessage, " + rMSBean);
+ return rMSBean;
}
private void processResponseMessage(RMMsgContext rmMsg, String
internalSequenceId, String outSequenceID, long messageNumber,
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=493983&r1=493982&r2=493983
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Mon Jan 8 00:42:07 2007
@@ -42,6 +42,7 @@
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
@@ -204,9 +205,9 @@
// It is possible that the message has
gone out, but not been acked yet. In that case
// we can store the
HIGHEST_OUT_MSG_NUMBER as the LAST_OUT_MESSAGE_NO, so that when the
// ack arrives we will terminate the
sequence
- SequencePropertyBean lastOutMsgNoBean =
new SequencePropertyBean(responseSideSequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,
highOutMessageNumberString);
- seqPropMgr.insert(lastOutMsgNoBean);
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
responseSideInternalSequenceId);
+
rmsBean.setLastOutMessage(highestOutMsgNo);
+
storageManager.getRMSBeanMgr().update(rmsBean);
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java?view=diff&rev=493983&r1=493982&r2=493983
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
Mon Jan 8 00:42:07 2007
@@ -22,17 +22,11 @@
* There is one entry for each sequence.
*/
-public class RMDBean extends RMBean {
+public class RMDBean extends RMSequenceBean {
private static final long serialVersionUID = -2976123838615087562L;
/**
- * Comment for <code>sequenceID</code>
- * The sequenceID of the representing sequence.
- */
- private String sequenceID;
-
- /**
* Comment for <code>nextMsgNoToProcess</code>
* The next message to be invoked of the representing sequence.
*/
@@ -61,7 +55,7 @@
}
public RMDBean(String sequenceID, long nextNsgNo) {
- this.sequenceID = sequenceID;
+ super(sequenceID);
this.nextMsgNoToProcess = nextNsgNo;
}
@@ -80,21 +74,6 @@
this.nextMsgNoToProcess = nextMsgNoToProcess;
}
- /**
- * @return Returns the sequenceId.
- */
- public String getSequenceID() {
- return sequenceID;
- }
-
- /**
- * @param sequenceId
- * The sequenceId to set.
- */
- public void setSequenceID(String sequenceID) {
- this.sequenceID = sequenceID;
- }
-
public boolean isPollingMode() {
return pollingMode;
}
@@ -130,7 +109,7 @@
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
- result.append("\nSequence Id: "); result.append(sequenceID);
+ result.append(super.toString());
result.append("\nNext Msg # : ");
result.append(nextMsgNoToProcess);
result.append("\nPolling : "); result.append(pollingMode);
result.append("\nRef Msg Key: ");
result.append(referenceMessageKey);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=493983&r1=493982&r2=493983
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
Mon Jan 8 00:42:07 2007
@@ -22,7 +22,7 @@
* There is on object of this for each sequence.
*/
-public class RMSBean extends RMBean {
+public class RMSBean extends RMSequenceBean {
private static final long serialVersionUID = 7051201094510208784L;
@@ -40,12 +40,6 @@
* This is the message ID of the create sequence message.
*/
private String createSeqMsgID;
-
- /**
- * Comment for <code>sequenceID</code>
- * This is the actual Sequence ID of the sequence.
- */
- private String sequenceID;
/**
* Comment for <code>securityTokenData</code>
@@ -84,7 +78,12 @@
* This is the timestamp of when the last error occured when sending
*/
private long lastSendErrorTimestamp = -1;
-
+
+ /**
+ * The last Out message number
+ */
+ private long lastOutMessage = 0;
+
public RMSBean() {
}
@@ -97,14 +96,6 @@
this.createSeqMsgID = createSeqMsgID;
}
- public String getSequenceID() {
- return sequenceID;
- }
-
- public void setSequenceID(String sequenceID) {
- this.sequenceID = sequenceID;
- }
-
public String getInternalSequenceID() {
return internalSequenceID;
}
@@ -164,16 +155,28 @@
this.lastSendErrorTimestamp = lastSendErrorTimestamp;
}
+
+ public long getLastOutMessage() {
+ return lastOutMessage;
+ }
+
+ public void setLastOutMessage(long lastOutMessage) {
+ this.lastOutMessage = lastOutMessage;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
- result.append("\nSequence Id : ");
result.append(sequenceID);
+ result.append(super.toString());
result.append("\nInternal Seq Id : ");
result.append(internalSequenceID);
result.append("\nCreateSeq Msg Id : ");
result.append(createSeqMsgID);
result.append("\nHas SecurityToken: ");
result.append(securityTokenData != null && securityTokenData.length() > 0);
result.append("\nCreateSeq Msg Key: ");
result.append(createSequenceMsgStoreKey);
result.append("\nReference Msg Key: ");
result.append(referenceMessageStoreKey);
result.append("\nPolling : ");
result.append(pollingMode);
+ result.append("\nLastOutMessageNumber: ");
result.append(lastOutMessage);
return result.toString();
}
+
+
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=493983&r1=493982&r2=493983
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
Mon Jan 8 00:42:07 2007
@@ -129,18 +129,18 @@
throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
FaultManager::checkForLastMsgNumberExceeded");
+/*
+ * TODO - This code currently doesn't actually work
Sequence sequence = (Sequence)
applicationRMMessage.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
long messageNumber =
sequence.getMessageNumber().getMessageNumber();
String sequenceID = sequence.getIdentifier().getIdentifier();
- SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropertyBeanMgr();
-
boolean lastMessageNumberExceeded = false;
String reason = null;
- SequencePropertyBean lastMessageBean =
seqPropMgr.retrieve(sequenceID,
-
Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO);
- if (lastMessageBean != null) {
- long lastMessageNo =
Long.parseLong(lastMessageBean.getValue());
+
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
+ if (rmsBean != null) {
+ long lastMessageNo = rmsBean.getLastOutMessage();
if (messageNumber > lastMessageNo) {
lastMessageNumberExceeded = true;
reason =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberLargerThanLastMsg,
Long
@@ -172,7 +172,7 @@
log.debug("Exit:
FaultManager::checkForLastMsgNumberExceeded, lastMessageNumberExceeded");
getFault(applicationRMMessage, faultData,
storageManager);
}
-
+*/
if (log.isDebugEnabled())
log.debug("Exit:
FaultManager::checkForLastMsgNumberExceeded");
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=493983&r1=493982&r2=493983
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Mon Jan 8 00:42:07 2007
@@ -58,7 +58,6 @@
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.engine.Handler;
-import org.apache.axis2.util.UUIDGenerator;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -111,7 +110,7 @@
*/
public static String getUUID() {
// String uuid = "uuid:" + UUIDGenerator.getUUID();
- String uuid = UUIDGenerator.getUUID();
+ String uuid = org.apache.axiom.om.util.UUIDGenerator.getUUID();
return uuid;
}
@@ -760,6 +759,18 @@
return bean;
}
+ public static final RMSBean getRMSBeanFromSequenceId(StorageManager
storageManager, String sequenceID)
+
+ throws SandeshaException {
+ RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
+ RMSBean bean = new RMSBean();
+ bean.setSequenceID(sequenceID);
+
+ bean = rmsBeanMgr.findUnique(bean);
+
+ return bean;
+ }
+
public static String getSequenceIDFromInternalSequenceID(String
internalSequenceID,
StorageManager storageManager) throws SandeshaException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]