Author: amilas
Date: Tue Oct 28 23:35:47 2008
New Revision: 708804
URL: http://svn.apache.org/viewvc?rev=708804&view=rev
Log:
apply the patch for SANDESHA2-179
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=708804&r1=708803&r2=708804&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Tue Oct 28 23:35:47 2008
@@ -103,15 +103,15 @@
log.debug("Exit:
AcknowledgementProcessor::processAckHeader, Sequence bean not found");
return;
}
-
+
if (outSequenceId == null || outSequenceId.length()==0) {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outSeqIDIsNull);
log.debug(message);
throw new SandeshaException(message);
}
- // If the message type is terminate sequence, then there may be
a piggy backed ACK for a
+ // If the message type is terminate sequence, then there may be
a piggy backed ACK for a
// sequence that has been terminated
- if
(rmMsgCtx.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ &&
+ if
(rmMsgCtx.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ &&
FaultManager.checkForSequenceTerminated(rmMsgCtx,
outSequenceId, rmsBean, piggybackedAck)) {
if (log.isDebugEnabled())
log.debug("Exit:
AcknowledgementProcessor::processAckHeader, Sequence terminated");
@@ -121,32 +121,44 @@
// Check that the sender of this Ack holds the correct token
String internalSequenceId = rmsBean.getInternalSequenceID();
SandeshaUtil.assertProofOfPossession(rmsBean, msgCtx,
soapHeader);
-
+
if(log.isDebugEnabled()) log.debug("Got Ack for RM Sequence: "
+ outSequenceId + ", internalSeqId: " + internalSequenceId);
Iterator ackRangeIterator =
sequenceAck.getAcknowledgementRanges().iterator();
-
+
if (FaultManager.checkForInvalidAcknowledgement(rmMsgCtx,
sequenceAck, storageManager, rmsBean, piggybackedAck)) {
if (log.isDebugEnabled())
log.debug("Exit:
AcknowledgementProcessor::processAckHeader, Invalid Ack range ");
return;
}
-
+
EndpointReference replyTo =
rmsBean.getReplyToEndpointReference();
boolean anonReplyTo = replyTo==null ||
replyTo.isWSAddressingAnonymous(); //if this is wsa anonymous
//then we might be using replay
-
+
String rmVersion = rmMsgCtx.getRMSpecVersion();
-
+
// Compare the clientCompletedMessages with the range we just
got, to work out if there
// is any new information in this ack message
RangeString completedMessages =
rmsBean.getClientCompletedMessages();
long numberOfNewMessagesAcked = 0;
-
+
boolean ackNeedsToSendInvalidFault = false; //if this ack
includes a msg that we have not sent then
//we should try to send a fault back to the client
Range firstInvalidRange = null; //If
there is a single invalid range then we set it here.
//If there is more than one we report the first invalid
range
+ //adding a MakeConnection for the response sequence if needed.
+ if (rmsBean.getOfferedSequence() != null) {
+
+ RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
+ RMDBean rMDBean = rMDBeanMgr.retrieve(outSequenceId);
+
+ if (rMDBean!=null && rMDBean.isPollingMode()) {
+ PollingManager manager =
storageManager.getPollingManager();
+ if(manager != null)
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+ }
+ }
+
while(ackRangeIterator.hasNext()) {
Range ackRange = (Range) ackRangeIterator.next();
long lower = ackRange.lowerValue;
@@ -157,14 +169,14 @@
//we now know that this range is complete so we
update it. This should aggregate the
//ranges together and tell us which numbers are
newly acked
Range[] newRanges =
completedMessages.addRange(ackedRange).getRanges();
-
+
// We now take each newly acked message in turn
and see if we need to update a sender bean
for (int rangeIndex=0; rangeIndex <
newRanges.length; rangeIndex++) {
//now work on each newly acked message
in this range
for(long messageNo =
newRanges[rangeIndex].lowerValue; messageNo<=newRanges[rangeIndex].upperValue;
messageNo++){
-
+
numberOfNewMessagesAcked++;
- SenderBean retransmitterBean =
retransmitterMgr.retrieve(outSequenceId, messageNo);
+ SenderBean retransmitterBean =
retransmitterMgr.retrieve(outSequenceId, messageNo);
if (retransmitterBean != null
&&
retransmitterBean.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION)
{
// Check we haven't got
an Ack for an application message that hasn't been sent yet !
if
(retransmitterBean.getSentCount() == 0 ) {
@@ -180,7 +192,7 @@
//delete the
sender bean that has been validly acknowledged (unless
//we use replay
model)
String
storageKey = retransmitterBean.getMessageContextRefKey();
-
+
boolean
syncResponseNeeded = false;
if
(anonReplyTo) {
MessageContext applicationMessage =
storageManager.retrieveMessageContext(storageKey, configCtx);
@@ -196,14 +208,14 @@
//
removing the application message from the storage if there is no replay model
retransmitterMgr.delete(retransmitterBean.getMessageID());
storageManager.removeMessageContext(storageKey);
- }
+ }
}
}
}//end for
}//end for
} //end while
}
-
+
if(ackNeedsToSendInvalidFault){
//try to send an invalid ack
FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx,
sequenceAck, firstInvalidRange,
@@ -216,18 +228,6 @@
// updating the last activated time of the sequence.
rmsBean.setLastActivatedTime(System.currentTimeMillis());
- //adding a MakeConnection for the response sequence if needed.
- if (rmsBean.getOfferedSequence() != null) {
-
- RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
- RMDBean rMDBean = rMDBeanMgr.retrieve(outSequenceId);
-
- if (rMDBean!=null && rMDBean.isPollingMode()) {
- PollingManager manager =
storageManager.getPollingManager();
- if(manager != null)
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
- }
- }
-
// We overwrite the previous client completed message ranges
with the
// latest view, but only if it is an update i.e. contained a new
// ack range (which is because we do not previous acks arriving
late
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java?rev=708804&r1=708803&r2=708804&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
Tue Oct 28 23:35:47 2008
@@ -82,7 +82,9 @@
if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
log.debug("Enter: WorkerLock::removeWork " + work);
Holder h = (Holder) locks.remove(work);
- h.release();
+ if (h != null){
+ h.release();
+ }
if (LoggingControl.isAnyTracingEnabled() &&
log.isDebugEnabled())
log.debug("Exit: WorkerLock::removeWork");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]