Author: gatfora
Date: Thu Jan 11 06:48:34 2007
New Revision: 495242
URL: http://svn.apache.org/viewvc?view=rev&rev=495242
Log:
Move clientCompletedMessages and terminate properties to RMSBean and
RMSequenceBean
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
Thu Jan 11 06:48:34 2007
@@ -253,13 +253,7 @@
// property. This is used as the
// sequenceId to share data b/w
// sequences
-
- //For incoming sequences this gives the msg no's of the
messages that were
- //received (may be an ack was sent - depending on the policy)
- //For out going sequences this gives the messages that were
sent and that were successfully
- //acked by the other end point.
- String CLIENT_COMPLETED_MESSAGES = "ClientCompletedMessages";
-
+
//For IN_ORDER sequences, we can have finite ranges of messages
that can be
//delivered out of order. These are maintained as a String that
is consistent
//with the form described in
org.apache.sandesha2.util.RangeString
@@ -270,8 +264,6 @@
String INCOMING_SEQUENCE_LIST = "IncomingSequenceList";
String OFFERED_SEQUENCE = "OfferedSequence";
-
- String TERMINATE_ADDED = "TerminateAdded";
String TERMINATE_RECEIVED = "TerminateReceived";
@@ -284,8 +276,6 @@
String SEQUENCE_CLOSED = "SequenceClosed";
String SEQUENCE_CLOSED_CLIENT = "SequenceClosedClient";
//indicates the client has sent a close sequence
-
- String SEQUENCE_TERMINATED = "SequenceTerminated";
String SEQUENCE_TIMED_OUT = "SequenceTimedOut";
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
Thu Jan 11 06:48:34 2007
@@ -50,7 +50,6 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.RMDBean;
@@ -126,17 +125,17 @@
// if data not is available sequence has to be
terminated or
// timedOut.
- if (rMSBean == null) {
+ if (rMSBean != null && rMSBean.isTerminated()) {
// check weather this is an terminated sequence.
- if (isSequenceTerminated(internalSequenceID,
seqPropMgr)) {
-
fillTerminatedOutgoingSequenceInfo(sequenceReport, internalSequenceID,
seqPropMgr);
+
fillTerminatedOutgoingSequenceInfo(sequenceReport, internalSequenceID,
storageManager);
- return sequenceReport;
- }
+ return sequenceReport;
+
+ } else if (rMSBean == null) {
if (isSequenceTimedout(internalSequenceID,
seqPropMgr)) {
-
fillTimedoutOutgoingSequenceInfo(sequenceReport, internalSequenceID,
seqPropMgr);
+
fillTimedoutOutgoingSequenceInfo(sequenceReport, internalSequenceID,
storageManager);
return sequenceReport;
}
@@ -166,7 +165,7 @@
}
sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_ESTABLISHED);
- fillOutgoingSequenceInfo(sequenceReport,
internalSequenceID, outSequenceID, seqPropMgr);
+ fillOutgoingSequenceInfo(sequenceReport,
internalSequenceID, outSequenceID, storageManager);
} catch (Exception e) {
if (reportTransaction!=null) {
@@ -371,13 +370,11 @@
SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropertyBeanMgr();
- boolean terminatedSequence = false;
-
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
//see if the sequence is terminated
- SequencePropertyBean sequenceTerminated =
seqPropMgr.retrieve(internalSequenceId,
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
- if(sequenceTerminated!=null){
+ boolean terminatedSequence = false;
+ if (rmsBean != null && rmsBean.isTerminated())
terminatedSequence = true;
- }
//see if the sequence is timed out
SequencePropertyBean sequenceTimedout =
seqPropMgr.retrieve(internalSequenceId,
Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
@@ -399,6 +396,8 @@
// Find all properties which have a matching
internal sequence id
removeBeans(sequenceId, seqPropMgr);
removeBeans(internalSequenceId, seqPropMgr);
+ // Delete the rmsBean
+
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
}
} catch (SandeshaException e) {
@@ -966,32 +965,6 @@
return dummyEnvelope;
}
- private static boolean isSequenceTerminated(String internalSequenceID,
SequencePropertyBeanMgr seqPropMgr)
- throws SandeshaException {
- SequencePropertyBean internalSequenceFindBean = new
SequencePropertyBean();
- internalSequenceFindBean.setValue(internalSequenceID);
-
internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-
- SequencePropertyBean internalSequenceBean =
seqPropMgr.findUnique(internalSequenceFindBean);
- if (internalSequenceBean == null) {
- String message = SandeshaMessageHelper.getMessage(
-
SandeshaMessageKeys.internalSeqBeanNotAvailableOnSequence, internalSequenceID);
- log.debug(message);
-
- return false;
- }
-
- String outSequenceID =
internalSequenceBean.getSequencePropertyKey();
-
- SequencePropertyBean sequenceTerminatedBean =
seqPropMgr.retrieve(outSequenceID,
-
Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED);
- if (sequenceTerminatedBean != null &&
Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue())) {
- return true;
- }
-
- return false;
- }
-
private static boolean isSequenceTimedout(String internalSequenceID,
SequencePropertyBeanMgr seqPropMgr)
throws SandeshaException {
SequencePropertyBean internalSequenceFindBean = new
SequencePropertyBean();
@@ -1018,12 +991,12 @@
}
private static void fillTerminatedOutgoingSequenceInfo(SequenceReport
report, String internalSequenceID,
- SequencePropertyBeanMgr seqPropMgr) throws
SandeshaException {
+ StorageManager storageManager) throws SandeshaException
{
SequencePropertyBean internalSequenceFindBean = new
SequencePropertyBean();
internalSequenceFindBean.setValue(internalSequenceID);
internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
- SequencePropertyBean internalSequenceBean =
seqPropMgr.findUnique(internalSequenceFindBean);
+ SequencePropertyBean internalSequenceBean =
storageManager.getSequencePropertyBeanMgr().findUnique(internalSequenceFindBean);
if (internalSequenceBean == null) {
String message = SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.notValidTerminate,
internalSequenceID);
@@ -1035,16 +1008,16 @@
report.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_TERMINATED);
String outSequenceID =
internalSequenceBean.getSequencePropertyKey();
- fillOutgoingSequenceInfo(report, internalSequenceID,
outSequenceID, seqPropMgr);
+ fillOutgoingSequenceInfo(report, internalSequenceID,
outSequenceID, storageManager);
}
private static void fillTimedoutOutgoingSequenceInfo(SequenceReport
report, String internalSequenceID,
- SequencePropertyBeanMgr seqPropMgr) throws
SandeshaException {
+ StorageManager storageManager) throws SandeshaException
{
SequencePropertyBean internalSequenceFindBean = new
SequencePropertyBean();
internalSequenceFindBean.setValue(internalSequenceID);
internalSequenceFindBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
- SequencePropertyBean internalSequenceBean =
seqPropMgr.findUnique(internalSequenceFindBean);
+ SequencePropertyBean internalSequenceBean =
storageManager.getSequencePropertyBeanMgr().findUnique(internalSequenceFindBean);
if (internalSequenceBean == null) {
String message = SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.notValidTimeOut,
internalSequenceID);
@@ -1055,23 +1028,23 @@
report.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_TIMED_OUT);
String outSequenceID =
internalSequenceBean.getSequencePropertyKey();
- fillOutgoingSequenceInfo(report, internalSequenceID,
outSequenceID, seqPropMgr);
+ fillOutgoingSequenceInfo(report, internalSequenceID,
outSequenceID, storageManager);
}
private static void fillOutgoingSequenceInfo(SequenceReport report,
String internalSequenceID, String outSequenceID,
- SequencePropertyBeanMgr seqPropMgr) throws
SandeshaException {
+ StorageManager storageManager) throws SandeshaException
{
report.setSequenceID(outSequenceID);
- ArrayList completedMessageList =
AcknowledgementManager.getClientCompletedMessagesList(internalSequenceID,
outSequenceID,
- seqPropMgr);
+ List completedMessageList =
AcknowledgementManager.getClientCompletedMessagesList(internalSequenceID,
outSequenceID,
+ storageManager);
Iterator iter = completedMessageList.iterator();
while (iter.hasNext()) {
- Long lng = new Long(Long.parseLong((String)
iter.next()));
- report.addCompletedMessage(lng);
+ report.addCompletedMessage((Long)iter.next());
}
- SequencePropertyBean tokenBean =
seqPropMgr.retrieve(internalSequenceID,
Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+ SequencePropertyBean tokenBean =
+
storageManager.getSequencePropertyBeanMgr().retrieve(internalSequenceID,
Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
if(tokenBean != null) report.setSecureSequence(true);
}
@@ -1080,9 +1053,8 @@
SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropertyBeanMgr();
- SequencePropertyBean terminatedBean =
seqPropMgr.retrieve(sequenceID,
-
Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED);
- if (terminatedBean != null) {
+ RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
+ if (rmdBean != null && rmdBean.isTerminated()) {
return SequenceReport.SEQUENCE_STATUS_TERMINATED;
}
@@ -1092,10 +1064,7 @@
return SequenceReport.SEQUENCE_STATUS_TIMED_OUT;
}
- RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
- RMDBean rMDBean = nextMsgMgr.retrieve(sequenceID);
-
- if (rMDBean != null) {
+ if (rmdBean != null) {
return SequenceReport.SEQUENCE_STATUS_ESTABLISHED;
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Thu Jan 11 06:48:34 2007
@@ -212,24 +212,15 @@
// setting the completed_messages list. This gives all the
messages of
// the sequence that were acked.
- SequencePropertyBean allCompletedMsgsBean =
seqPropMgr.retrieve(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
- if (allCompletedMsgsBean == null) {
- allCompletedMsgsBean = new SequencePropertyBean();
-
allCompletedMsgsBean.setSequencePropertyKey(sequencePropertyKey);
-
allCompletedMsgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
- seqPropMgr.insert(allCompletedMsgsBean);
- }
-
- String str = ackedMessagesList.toString();
- allCompletedMsgsBean.setValue(str);
-
- seqPropMgr.update(allCompletedMsgsBean);
-
- RMSBean bean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
+ // Set the completed message list
+ rmsBean.setClientCompletedMessages(ackedMessagesList);
+
+ long highestOutMsgNo = rmsBean.getLastOutMessage();
- long highestOutMsgNo = bean.getLastOutMessage();
+ // Update the RMSBean
+ storageManager.getRMSBeanMgr().update(rmsBean);
if (highestOutMsgNo > 0) {
boolean complete =
AcknowledgementManager.verifySequenceCompletion(sequenceAck
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Thu Jan 11 06:48:34 2007
@@ -201,10 +201,10 @@
if(sequenceClosed!=null){
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceClosed,
internalSequenceId));
}
-
+
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
//see if the sequence is terminated
- SequencePropertyBean sequenceTerminated =
seqPropMgr.retrieve(internalSequenceId,
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
- if(sequenceTerminated!=null){
+ if(rmsBean != null && rmsBean.isTerminateAdded()) {
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTerminated,
internalSequenceId));
}
@@ -214,10 +214,6 @@
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTimedout,
internalSequenceId));
}
- boolean sendCreateSequence = false;
-
- RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
-
// FINDING THE SPEC VERSION
String specVersion = null;
if (msgContext.isServerSide()) {
@@ -264,6 +260,8 @@
specVersion =
SpecSpecificConstants.getDefaultSpecVersion();
String outSequenceID = null;
+
+ boolean sendCreateSequence = false;
if (rmsBean == null) { // out sequence will be set for the
// server side, in the case of an offer.
sendCreateSequence = true; // message number being one
and not
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Thu Jan 11 06:48:34 2007
@@ -151,8 +151,6 @@
SandeshaUtil.startPollingManager(configCtx);
}
}
-
- createSeqMgr.update(rmsBean);
SenderBean createSequenceSenderBean =
retransmitterMgr.retrieve(createSeqMsgId);
if (createSequenceSenderBean == null)
@@ -233,11 +231,7 @@
RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
rmdBeanMgr.insert(rMDBean);
- SequencePropertyBean msgsBean = new
SequencePropertyBean();
- msgsBean.setSequencePropertyKey(offeredSequenceId);
-
msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
- msgsBean.setValue("");
- sequencePropMgr.insert(msgsBean);
+ createSeqMgr.update(rmsBean);
// Store the security token for the offered sequence
if(tokenData != null) {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Thu Jan 11 06:48:34 2007
@@ -118,12 +118,11 @@
setUpHighestMsgNumbers(context,
storageManager,sequencePropertyKey, sequenceId, terminateSeqRMMsg);
- TerminateManager.cleanReceivingSideOnTerminateMessage(context,
sequencePropertyKey, sequenceId, storageManager);
-
- SequencePropertyBean terminatedBean = new
SequencePropertyBean(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,
Sandesha2Constants.VALUE_TRUE);
+ RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+ rmdBean.setTerminated(true);
+ storageManager.getRMDBeanMgr().update(rmdBean);
- sequencePropertyBeanMgr.insert(terminatedBean);
+ TerminateManager.cleanReceivingSideOnTerminateMessage(context,
sequencePropertyKey, sequenceId, storageManager);
SequenceManager.updateLastActivatedTime(sequencePropertyKey,
storageManager);
@@ -273,8 +272,6 @@
log.debug("Exit:
TerminateSeqMsgProcessor::addTerminateSequenceResponse");
return terminateSeqResponseRMMsg;
-
-
}
public boolean processOutMessage(RMMsgContext rmMsgCtx) throws
AxisFault {
@@ -285,11 +282,10 @@
// Get the parent processor to setup the out message
setupOutMessage(rmMsgCtx);
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(getStorageManager(),
getInternalSequenceID());
+
// Check if the sequence is already terminated (stored on the
internal sequenceid)
- String terminated =
SandeshaUtil.getSequenceProperty(getInternalSequenceID(),
-
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, getStorageManager());
-
- if (terminated != null && "true".equals(terminated)) {
+ if (rmsBean.isTerminateAdded()) {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
log.debug(message);
if (log.isDebugEnabled())
@@ -317,12 +313,10 @@
rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(getRMVersion()));
rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(getRMVersion()));
- SequencePropertyBean terminateAdded = new
SequencePropertyBean();
-
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
- terminateAdded.setSequencePropertyKey(getInternalSequenceID());
- terminateAdded.setValue("true");
+ rmsBean.setTerminateAdded(true);
-
getStorageManager().getSequencePropertyBeanMgr().insert(terminateAdded);
+ // Update the RMSBean with the terminate added flag
+ getStorageManager().getRMSBeanMgr().update(rmsBean);
// Send the outgoing message
// Set a retransmitter lastSentTime so that terminate will be
send with
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
Thu Jan 11 06:48:34 2007
@@ -97,7 +97,7 @@
}
private void pollRMSSide() throws AxisFault {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::pollRMSSide");
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::pollRMSSide");
RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
RMSBean findRMS = new RMSBean();
@@ -119,7 +119,7 @@
}
private void pollRMDSide() throws AxisFault {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::pollRMDSide");
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::pollRMDSide");
//geting the sequences to be polled.
//if shedule contains any requests, do the earliest one.
//else pick one randomly.
@@ -143,18 +143,16 @@
RMDBean nextMsgBean = (RMDBean) results.get(rmdIndex++);
pollForSequence(nextMsgBean.getSequenceID(),
nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean);
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::pollRMDSide");
+ if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::pollRMDSide");
}
private void pollForSequence(String sequenceId, String
sequencePropertyKey, String referenceMsgKey, RMSequenceBean rmBean) throws
SandeshaException, SandeshaStorageException, AxisFault {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey +
", " + referenceMsgKey + ", " + rmBean);
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey +
", " + referenceMsgKey + ", " + rmBean);
// Don't poll for a terminated sequence
// TODO once the 'terminated' flag is a property on the RMS /
RMD bean, we should
// be able to filter out terminated sequences before we get
here.
- String terminated =
SandeshaUtil.getSequenceProperty(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED, storageManager);
- if(terminated != null && "true".equals(terminated)) {
+ if(rmBean.isTerminated()) {
if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::pollForSequence, already terminated");
return;
}
@@ -207,7 +205,7 @@
}
private synchronized String getNextSheduleEntry () {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::getNextSheduleEntry");
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::getNextSheduleEntry");
String sequenceId = null;
if (sheduledPollingRequests.size()>0) {
@@ -230,7 +228,7 @@
* @throws SandeshaException
*/
public synchronized void start (ConfigurationContext
configurationContext) throws SandeshaException {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::start");
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::start");
this.configurationContext = configurationContext;
this.sheduledPollingRequests = new HashMap ();
@@ -246,19 +244,19 @@
*
*/
public synchronized void stopPolling () {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::stopPolling");
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::stopPolling");
setPoll(false);
if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::stopPolling");
}
public synchronized void setPoll (boolean poll) {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::setPoll");
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::setPoll");
this.poll = poll;
if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::setPoll");
}
public synchronized boolean isPoll () {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::isPoll");
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::isPoll");
if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::isPoll");
return poll;
}
@@ -274,7 +272,7 @@
* @param sequenceId
*/
public synchronized void shedulePollingRequest (String sequenceId) {
- if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::shedulePollingRequest, " + sequenceId);
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::shedulePollingRequest, " + sequenceId);
if (sheduledPollingRequests.containsKey (sequenceId)) {
Integer sequenceEntryCount = (Integer)
sheduledPollingRequests.get(sequenceId);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
Thu Jan 11 06:48:34 2007
@@ -43,6 +43,9 @@
private String highestInMessageId;
+ /** For incoming sequences this gives the msg no's of the messages that
were
+ * received (may be an ack was sent - depending on the policy)
+ */
private List serverCompletedMessages = null;
public RMDBean() {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
Thu Jan 11 06:48:34 2007
@@ -17,6 +17,8 @@
package org.apache.sandesha2.storage.beans;
+import java.util.List;
+
/**
* This bean is used at the sending side (of both server and client)
* There is on object of this for each sequence.
@@ -92,6 +94,17 @@
* The next sequence number to apply to the message
*/
private long nextMessageNumber = -1;
+
+ /**
+ * For out going sequences this gives the messages that were sent and
that were successfully
+ * acked by the other end point.
+ */
+ private List clientCompletedMessages = null;
+
+ /**
+ * Indicates that a terminate sequence message was added.
+ */
+ private boolean terminateAdded = false;
public RMSBean() {
}
@@ -193,6 +206,25 @@
this.nextMessageNumber = nextMessageNumber;
}
+ public List getClientCompletedMessages() {
+ return clientCompletedMessages;
+ }
+
+
+ public void setClientCompletedMessages(List clientCompletedMessages) {
+ this.clientCompletedMessages = clientCompletedMessages;
+ }
+
+
+ public boolean isTerminateAdded() {
+ return terminateAdded;
+ }
+
+
+ public void setTerminateAdded(boolean terminateAdded) {
+ this.terminateAdded = terminateAdded;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
@@ -206,6 +238,8 @@
result.append("\nHighestOutMessage: ");
result.append(highestOutMessageNumber);
result.append("\nHighestOutRelatesTo:
");result.append(highestOutRelatesTo);
result.append("\nNextMessageNumber: ");
result.append(nextMessageNumber);
+ result.append("\nTerminateAdded : ");
result.append(terminateAdded);
return result.toString();
}
+
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java
Thu Jan 11 06:48:34 2007
@@ -36,6 +36,11 @@
private String replyToEPR;
private String acksToEPR;
+
+ /**
+ * Indicates that a sequence is terminated
+ */
+ private boolean terminated = false;
/**
* This tells weather this sequence is in the polling mode or not.
@@ -99,6 +104,15 @@
this.pollingMode = pollingMode;
}
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+
+ public void setTerminated(boolean terminated) {
+ this.terminated = terminated;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append("\nSequence Id: "); result.append(sequenceID);
@@ -106,6 +120,7 @@
result.append("\nreplyToEPR : "); result.append(replyToEPR);
result.append("\nacksToEPR : "); result.append(acksToEPR);
result.append("\nPolling : "); result.append(pollingMode);
+ result.append("\nTerminated : ");
result.append(terminated);
return result.toString();
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
Thu Jan 11 06:48:34 2007
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import javax.xml.namespace.QName;
@@ -42,10 +43,9 @@
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -159,29 +159,21 @@
* @param outGoingMessage
* @return
*/
- public static ArrayList getClientCompletedMessagesList(String
internalSequenceID, String sequenceID, SequencePropertyBeanMgr seqPropMgr)
+ public static List getClientCompletedMessagesList(String
internalSequenceID, String sequenceID, StorageManager storageManager)
throws SandeshaException {
if (log.isDebugEnabled())
log.debug("Enter:
AcknowledgementManager::getClientCompletedMessagesList " + internalSequenceID +
", " + sequenceID);
- SequencePropertyBean completedMessagesBean = null;
- if (internalSequenceID != null)
- completedMessagesBean =
seqPropMgr.retrieve(internalSequenceID,
-
Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
-
- if (completedMessagesBean == null)
- completedMessagesBean = seqPropMgr.retrieve(sequenceID,
-
Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
-
- ArrayList completedMsgList = null;
- if (completedMessagesBean != null) {
- completedMsgList =
SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
- } else {
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
+
+ if (rmsBean == null) {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull,
sequenceID);
SandeshaException e = new SandeshaException(message);
if(log.isDebugEnabled()) log.debug("Throwing
exception", e);
throw e;
}
+
+ List completedMsgList = rmsBean.getClientCompletedMessages();
if (log.isDebugEnabled())
log.debug("Exit:
AcknowledgementManager::getClientCompletedMessagesList");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Thu Jan 11 06:48:34 2007
@@ -883,20 +883,19 @@
SequencePropertyBean sequencePropertyBean =
sequencePropertyBeanMgr.retrieve(id, name);
if (sequencePropertyBean == null)
return null;
- else
- return sequencePropertyBean.getValue();
+
+ return sequencePropertyBean.getValue();
}
public static boolean isAllMsgsAckedUpto(long highestInMsgNo, String
sequencePropertyKey,
StorageManager storageManager) throws SandeshaException
{
- String clientCompletedMessages =
getSequenceProperty(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES,
storageManager);
- ArrayList ackedMsgsList =
getArrayListFromString(clientCompletedMessages);
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
sequencePropertyKey);
+ List ackedMsgsList = rmsBean.getClientCompletedMessages();
long smallestMsgNo = 1;
for (long tempMsgNo = smallestMsgNo; tempMsgNo <=
highestInMsgNo; tempMsgNo++) {
- if (!ackedMsgsList.contains(new
Long(tempMsgNo).toString()))
+ if (!ackedMsgsList.contains(new Long(tempMsgNo)))
return false;
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Thu Jan 11 06:48:34 2007
@@ -263,12 +263,8 @@
}
- SequencePropertyBean msgsBean = new SequencePropertyBean();
- msgsBean.setSequencePropertyKey(sequencePropertyKey);
-
msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
- msgsBean.setValue("");
-
- seqPropMgr.insert(msgsBean);
+ // New up the client completed messages list
+ rmsBean.setClientCompletedMessages(new ArrayList());
// saving transportTo value;
String transportTo = (String)
firstAplicationMsgCtx.getProperty(Constants.Configuration.TRANSPORT_URL);
@@ -437,54 +433,4 @@
return sequenceTimedOut;
}
-
- public static long getOutGoingSequenceAckedMessageCount(String
sequencePropertyKey, StorageManager storageManager)
- throws SandeshaException {
- // / Transaction transaction = storageManager.getTransaction();
- SequencePropertyBeanMgr seqPropBeanMgr =
storageManager.getSequencePropertyBeanMgr();
-
- SequencePropertyBean ackedMsgBean =
seqPropBeanMgr.retrieve(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
- if (ackedMsgBean == null)
- return 0; // No acknowledgement has been received yet.
-
- long noOfMessagesAcked =
Long.parseLong(ackedMsgBean.getValue());
- // / transaction.commit();
-
- return noOfMessagesAcked;
- }
-
- public static boolean isOutGoingSequenceCompleted(String
internalSequenceID, StorageManager storageManager)
- throws SandeshaException {
- // / Transaction transaction = storageManager.getTransaction();
- SequencePropertyBeanMgr seqPropBeanMgr =
storageManager.getSequencePropertyBeanMgr();
-
- SequencePropertyBean terminateAddedBean =
seqPropBeanMgr.retrieve(internalSequenceID,
-
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
- if (terminateAddedBean == null)
- return false;
-
- if ("true".equals(terminateAddedBean.getValue()))
- return true;
-
- return false;
- }
-
- public static boolean isIncomingSequenceCompleted(String sequenceID,
StorageManager storageManager)
- throws SandeshaException {
-
- // / Transaction transaction = storageManager.getTransaction();
- SequencePropertyBeanMgr seqPropBeanMgr =
storageManager.getSequencePropertyBeanMgr();
-
- SequencePropertyBean terminateReceivedBean =
seqPropBeanMgr.retrieve(sequenceID,
-
Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
- boolean complete = false;
-
- if (terminateReceivedBean != null &&
"true".equals(terminateReceivedBean.getValue()))
- complete = true;
-
- // / transaction.commit();
- return complete;
- }
-
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=495242&r1=495241&r2=495242
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Thu Jan 11 06:48:34 2007
@@ -35,7 +35,6 @@
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -188,12 +187,11 @@
public static void terminateSendingSide(ConfigurationContext
configContext, String sequencePropertyKey, String internalSequenceID,
boolean serverSide, StorageManager storageManager)
throws SandeshaException {
- SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropertyBeanMgr();
-
- SequencePropertyBean seqTerminatedBean = new
SequencePropertyBean(internalSequenceID,
-
Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,
Sandesha2Constants.VALUE_TRUE);
- seqPropMgr.insert(seqTerminatedBean);
-
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceID);
+ // Indicate that the sequence is terminated
+ rmsBean.setTerminated(true);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
cleanSendingSideData (configContext, sequencePropertyKey ,
internalSequenceID, serverSide, storageManager);
}
@@ -202,14 +200,6 @@
boolean addEntryWithSequenceID = false;
- if
(propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES))
{
- addEntryWithSequenceID = true;
- }
-
- if
(propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED))
{
- addEntryWithSequenceID = true;
- }
-
if
(propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED))
{
addEntryWithSequenceID = true;
}
@@ -239,22 +229,12 @@
private static boolean isPropertyDeletable(String name) {
boolean deleatable = true;
- if
(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED.equals(name))
- deleatable = false;
-
if
(Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED.equals(name))
deleatable = false;
if
(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
deleatable = false;
- // if
- //
(Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name))
- // deleatable = false;
-
- if
(Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED.equals(name))
- deleatable = false;
-
if
(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED.equals(name))
deleatable = false;
@@ -280,7 +260,6 @@
SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropertyBeanMgr();
SenderBeanMgr retransmitterBeanMgr =
storageManager.getSenderBeanMgr();
- RMSBeanMgr rMSBeanMgr = storageManager.getRMSBeanMgr();
// removing retransmitterMgr entries and corresponding message
contexts.
Collection collection =
retransmitterBeanMgr.find(internalSequenceId);
@@ -298,7 +277,7 @@
createSeqFindBean.setInternalSequenceID(internalSequenceId);
RMSBean rMSBean =
storageManager.getRMSBeanMgr().findUnique(createSeqFindBean);
- rMSBeanMgr.delete(rMSBean.getCreateSeqMsgID());
+ //rMSBeanMgr.delete(rMSBean.getCreateSeqMsgID());
String outSequenceID = rMSBean.getSequenceID();
@@ -326,10 +305,9 @@
SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropertyBeanMgr();
- SequencePropertyBean terminated =
seqPropMgr.retrieve(internalSequenceID,
-
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceID);
- if (terminated != null && terminated.getValue() != null &&
"true".equals(terminated.getValue())) {
+ if (rmsBean.isTerminateAdded()) {
if(log.isDebugEnabled())
log.debug("Exit:
TerminateManager::addTerminateSequenceMessage - terminate was added
previously.");
return;
@@ -353,8 +331,6 @@
toEPR = new EndpointReference (endpointBean.getValue());
}
- RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceID);
-
if (toEPR==null) {
if (rmsBean.getToEPR()!=null) {
@@ -412,12 +388,9 @@
if (to!=null)
terminateBean.setToAddress(to.getAddress());
- SequencePropertyBean terminateAdded = new
SequencePropertyBean();
-
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
- terminateAdded.setSequencePropertyKey(internalSequenceID);
- terminateAdded.setValue("true");
+ rmsBean.setTerminateAdded(true);
- seqPropMgr.insert(terminateAdded);
+ storageManager.getRMSBeanMgr().update(rmsBean);
terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,
Sandesha2Constants.VALUE_TRUE);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]