Author: gatfora
Date: Wed Feb 21 02:55:08 2007
New Revision: 509965
URL: http://svn.apache.org/viewvc?view=rev&rev=509965
Log:
Modify SequenceProcessor so that ACKS are sent to non anonymous acks-to
addresses when processing a duplicate message
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=509965&r1=509964&r2=509965
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Wed Feb 21 02:55:08 2007
@@ -223,37 +223,32 @@
if((replyTo==null || replyTo.hasAnonymousAddress()) &&
(specVersion!=null &&
specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
- SenderBeanMgr senderBeanMgr =
storageManager.getSenderBeanMgr();
- SenderBean findSenderBean = new SenderBean ();
-
findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
-
findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
-
findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
- findSenderBean.setSend(true);
+ SenderBeanMgr senderBeanMgr =
storageManager.getSenderBeanMgr();
+ SenderBean findSenderBean = new SenderBean ();
+
findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+
findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
+
findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
+ findSenderBean.setSend(true);
- SenderBean replyMessageBean =
senderBeanMgr.findUnique(findSenderBean);
+ SenderBean replyMessageBean =
senderBeanMgr.findUnique(findSenderBean);
- // this is effectively a poll for the replyMessage,
wo re-use the logic in the MakeConnection
- // processor. This will use this thread to re-send
the reply, writing it into the transport.
- // As the reply is now written we do not want to
continue processing, or suspend, so we abort.
- if(replyMessageBean != null) {
- if(log.isDebugEnabled()) log.debug("Found
matching reply for replayed message");
- MakeConnectionProcessor.replyToPoll(rmMsgCtx,
replyMessageBean, storageManager, false, null);
+ // this is effectively a poll for the replyMessage,
wo re-use the logic in the MakeConnection
+ // processor. This will use this thread to re-send
the reply, writing it into the transport.
+ // As the reply is now written we do not want to
continue processing, or suspend, so we abort.
+ if(replyMessageBean != null) {
+ if(log.isDebugEnabled()) log.debug("Found
matching reply for replayed message");
+ MakeConnectionProcessor.replyToPoll(rmMsgCtx,
replyMessageBean, storageManager, false, null);
result = InvocationResponse.ABORT;
if (log.isDebugEnabled())
log.debug("Exit:
SequenceProcessor::processReliableMessage, replayed message: " + result);
return result;
- }
- }
+ }
+ }
+
EndpointReference acksTo = new EndpointReference
(bean.getAcksToEPR());
- if (acksTo.hasAnonymousAddress()) {
- RMMsgContext ackRMMsgContext =
AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId,
storageManager,false,true);
-
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
Constants.VALUE_TRUE);
-
AcknowledgementManager.sendAckNow(ackRMMsgContext);
- result = InvocationResponse.ABORT;
- if (log.isDebugEnabled())
- log.debug("Exit:
SequenceProcessor::processReliableMessage, acking duplicate message: " +
result);
- return result;
- }
+
+ // Send an Ack if needed.
+ sendAckIfNeeded(sequenceId, rmMsgCtx, storageManager,
true, acksTo.hasAnonymousAddress());
result = InvocationResponse.ABORT;
if (log.isDebugEnabled())
@@ -383,59 +378,28 @@
}
- public static void sendAckIfNeeded(RMMsgContext rmMsgCtx,
StorageManager storageManager, boolean serverSide)
+ private static void sendAckIfNeeded(String sequenceId, RMMsgContext
rmMsgCtx,
+ StorageManager storageManager, boolean serverSide,
boolean anonymousAcksTo)
throws AxisFault {
if (log.isDebugEnabled())
- log.debug("Enter: SequenceProcessor::sendAckIfNeeded");
-
- Sequence sequence = (Sequence) rmMsgCtx
-
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
- if(sequence!=null){
- String sequenceId =
sequence.getIdentifier().getIdentifier();
- ConfigurationContext configCtx =
rmMsgCtx.getMessageContext()
- .getConfigurationContext();
- if (configCtx == null) {
- String message = SandeshaMessageHelper
-
.getMessage(SandeshaMessageKeys.configContextNotSet);
- if (log.isDebugEnabled())
- log.debug(message);
- throw new SandeshaException(message);
- }
+ log.debug("Enter: SequenceProcessor::sendAckIfNeeded "
+ sequenceId);
RMMsgContext ackRMMsgCtx =
AcknowledgementManager.generateAckMessage(
rmMsgCtx , sequenceId, storageManager,
false, serverSide);
- MessageContext ackMsgCtx =
ackRMMsgCtx.getMessageContext();
-
- EndpointReference acksTo = ackRMMsgCtx.getTo();
- EndpointReference replyTo = rmMsgCtx.getReplyTo();
- boolean anonAck = (acksTo == null) ||
acksTo.hasAnonymousAddress();
- boolean anonReply = (replyTo == null) ||
replyTo.hasAnonymousAddress();
-
- // Only use the backchannel for ack messages if we are
sure that the
- // application
- // doesn't need it. A 1-way MEP should be complete by
now.
- boolean complete =
ackMsgCtx.getOperationContext().isComplete();
- if (anonAck && anonReply && !complete) {
- if (log.isDebugEnabled())
- log
- .debug("Exit:
SequenceProcessor::sendAckIfNeeded, avoiding using backchannel");
- return;
- }
-
- long ackInterval = SandeshaUtil.getPropertyBean(
-
rmMsgCtx.getMessageContext().getAxisService())
- .getAcknowledgementInterval();
- long timeToSend = System.currentTimeMillis() +
ackInterval;
- if (anonAck) {
+ if (anonymousAcksTo) {
+
rmMsgCtx.getMessageContext().getOperationContext().
+
setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
AcknowledgementManager.sendAckNow(ackRMMsgCtx);
- } else if (!anonAck) {
+ } else {
+ long ackInterval = SandeshaUtil.getPropertyBean(
+
rmMsgCtx.getMessageContext().getAxisService())
+ .getAcknowledgementInterval();
+ long timeToSend = System.currentTimeMillis() +
ackInterval;
AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend,
storageManager);
}
- }
if (log.isDebugEnabled())
log.debug("Exit: SequenceProcessor::sendAckIfNeeded");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=509965&r1=509964&r2=509965
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
Wed Feb 21 02:55:08 2007
@@ -268,6 +268,7 @@
result.append(this.getClass().getName());
result.append("\nSequence Id : "); result.append(sequenceID);
result.append("\nInternal Seq Id: ");
result.append(internalSequenceID);
+ result.append("\nTo : "); result.append(toAddress);
result.append("\nMessage Number : ");
result.append(messageNumber);
result.append("\nMessage Type : ");
result.append(messageType);
result.append("\nMessage Key : ");
result.append(messageContextRefKey);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]