Author: chamikara
Date: Sat Nov 19 09:36:36 2005
New Revision: 345658
URL: http://svn.apache.org/viewcvs?rev=345658&view=rev
Log:
Async Acks were made to be sent as standalone only after waiting for an given
wsp:acknowledgementInterval. Unwanted ack entries will be deleted (for e.g. ack
1-2 (yet to be send) will be deleted when adding ack 1-3)
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Sat Nov
19 09:36:36 2005
@@ -118,8 +118,8 @@
}
public interface WSP {
- long RETRANSMISSION_INTERVAL = 1000;
- long ACKNOWLEDGEMENT_INTERVAL = 3000;
+ long RETRANSMISSION_INTERVAL = 20000;
+ long ACKNOWLEDGEMENT_INTERVAL = 4000;
boolean EXPONENTION_BACKOFF = true;
long INACTIVITY_TIMEOUT_INTERVAL = 5000000;
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Sat Nov 19 09:36:36 2005
@@ -18,6 +18,9 @@
package org.apache.sandesha2.msgprocessors;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
import javax.xml.namespace.QName;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
@@ -34,9 +37,11 @@
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.soap.SOAPEnvelope;
import org.apache.axis2.soap.SOAPFactory;
+import org.apache.derby.tools.sysinfo;
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
@@ -55,6 +60,8 @@
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.wsdl.WSDLConstants;
+import com.sun.rsasign.p;
+
public class ApplicationMsgProcessor implements MsgProcessor {
private boolean letInvoke = false;
@@ -302,6 +309,8 @@
RMMsgContext ackRMMsgCtx = SandeshaUtil.deepCopy(rmMsgCtx);
MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+ ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
ackMsgCtx.setAxisServiceGroup(serviceGroup);
ackMsgCtx.setServiceGroupContext(serviceGroupContext);
ackMsgCtx.setAxisService(service);
@@ -389,7 +398,35 @@
ackBean.setReSend(false);
ackBean.setSend(true);
ackBean.setMessagetype(Constants.MessageTypes.ACK);
-
+
+ //the tempSequenceId value of the retransmitter Table
for the messages related to an incoming
+ //sequence is the actual sequence ID - TODO document
this.
+ ackBean.setTempSequenceId(sequenceId);
+
+ RMPolicyBean policyBean = (RMPolicyBean)
rmMsgCtx.getProperty(Constants.WSP.RM_POLICY_BEAN);
+ long ackInterval =
Constants.WSP.ACKNOWLEDGEMENT_INTERVAL;
+ if (policyBean!=null) {
+ ackInterval =
policyBean.getAcknowledgementInaterval();
+ }
+
+ //Ack will be sent as stand alone, only after the
retransmitter interval.
+ long timeToSend =
System.currentTimeMillis()+ackInterval;
+ ackBean.setTimeToSend(timeToSend);
+
+
+ //removing old acks.
+ RetransmitterBean findBean = new RetransmitterBean ();
+ findBean.setMessagetype(Constants.MessageTypes.ACK);
+ findBean.setTempSequenceId(sequenceId);
+ Collection coll = retransmitterBeanMgr.find(findBean);
+ Iterator it = coll.iterator();
+ while (it.hasNext()) {
+ RetransmitterBean retransmitterBean =
(RetransmitterBean) it.next();
+
retransmitterBeanMgr.delete(retransmitterBean.getMessageId());
+ }
+
+
+ //inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
SandeshaUtil.startSenderIfStopped(configCtx);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
Sat Nov 19 09:36:36 2005
@@ -115,13 +115,9 @@
temp = (RetransmitterBean) iterator.next();
if (temp.isSend()) {
- long timeToSend = temp.getTimeToSend();
-
- int count = temp.getSentCount();
-
+ long timeToSend = temp.getTimeToSend();
long timeNow = System.currentTimeMillis();
- if (count == 0
- || (timeNow >= timeToSend)) {
+ if ((timeNow >= timeToSend)) {
beans.add(temp);
}
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
Sat Nov 19 09:36:36 2005
@@ -20,6 +20,7 @@
import org.apache.axis2.context.MessageContext;
import org.apache.derby.iapi.sql.dictionary.ConsInfo;
import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.SandeshaDynamicProperties;
import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.beans.RetransmitterBean;
@@ -43,11 +44,9 @@
RMPolicyBean policyBean = (RMPolicyBean)
messageContext.getProperty(Constants.WSP.RM_POLICY_BEAN);
if (policyBean==null){
- return retransmitterBean;
+ policyBean = new
SandeshaDynamicProperties().getPolicyBean();
}
- long oldRetransmissionTime = retransmitterBean.getTimeToSend();
-
retransmitterBean.setSentCount(retransmitterBean.getSentCount()+1);
adjustNextRetransmissionTime (retransmitterBean,policyBean);
@@ -65,12 +64,13 @@
long baseInterval = policyBean.getRetransmissionInterval();
- long timeToSendNext;
+ long newInterval = baseInterval;
if (policyBean.isExponentialBackoff()) {
- long newInterval =
generateNextExponentialBackedoffDifference (count,baseInterval);
-
retransmitterBean.setTimeToSend(lastSentTime+newInterval);
+ newInterval =
generateNextExponentialBackedoffDifference (count,baseInterval);
}
+ retransmitterBean.setTimeToSend(lastSentTime+newInterval);
+
return retransmitterBean;
}
@@ -82,7 +82,7 @@
//TODO: Have to change this to be plugable
private long generateNextExponentialBackedoffDifference(int count,long
initialInterval) {
long interval = initialInterval;
- for (int i=1;i<=count;i++){
+ for (int i=1;i<count;i++){
interval = interval*2;
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sat
Nov 19 09:36:36 2005
@@ -87,9 +87,18 @@
+ "'
message.");
}
- new
AxisEngine(context).send(msgCtx);
+ try {
+ new
AxisEngine(context).send(msgCtx);
+ }catch (Exception e) {
+ //Exception is sending.
retry later
+
System.out.println("Exception thrown in sending...");
+ e.printStackTrace();
+ }
+
MessageRetransmissionAdjuster
retransmitterAdjuster = new MessageRetransmissionAdjuster ();
retransmitterAdjuster.adjustRetransmittion(bean);
+
+ mgr.update(bean);
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]