Author: chamikara
Date: Thu Sep 14 23:05:25 2006
New Revision: 446528
URL: http://svn.apache.org/viewvc?view=rev&rev=446528
Log:
Applied the patch from Matt.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=diff&rev=446528&r1=446527&r2=446528
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
Thu Sep 14 23:05:25 2006
@@ -17,8 +17,6 @@
package org.apache.sandesha2.handlers;
-import javax.xml.namespace.QName;
-
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -89,6 +87,14 @@
return;
}
}
+ // Also do not apply RM to fault messages
+ {
+ if(msgCtx.isProcessingFault()) {
+ if(log.isDebugEnabled())
+ log.debug("Exit:
SandeshaOutHandler::invoke, Skipping sandesha processing for fault message");
+ return;
+ }
+ }
String DONE = (String)
msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
if (null != DONE && "true".equals(DONE)) {
@@ -116,11 +122,6 @@
try {
// getting rm message
RMMsgContext rmMsgCtx =
MsgInitializer.initializeMessage(msgCtx);
-
- String dummyMessageString = (String)
msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
- boolean dummyMessage = false;
- if (dummyMessageString != null &&
Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
- dummyMessage = true;
MsgProcessor msgProcessor = null;
int messageType = rmMsgCtx.getMessageType();
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=446528&r1=446527&r2=446528
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
Thu Sep 14 23:05:25 2006
@@ -64,8 +64,8 @@
public static final String cannotInnitMessage="cannotInnitMessage";
public static final String propertyInvalidValue="propertyInvalidValue";
public static final String
couldNotCopyParameters="couldNotCopyParameters";
- public static final String senderBeanNotFound="couldNotCopyParameters";
- public static final String workAlreadyAssigned="couldNotCopyParameters";
+ public static final String senderBeanNotFound="senderBeanNotFound";
+ public static final String workAlreadyAssigned="workAlreadyAssigned";
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=446528&r1=446527&r2=446528
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
Thu Sep 14 23:05:25 2006
@@ -82,7 +82,7 @@
sendHasUnavailableMsgEntry=Sandesha2 Internal error: sender has an unavailable
message entry {0}.
cannotInnitMessage=Sandesha2 Internal error: cannot initialize the message.
propertyInvalidValue=Sandesha2 Internal error: property {0} contains an
invalid value.
-couldNotCopyParameters=Could not copy parameters when creating the new RM
Message.
+couldNotCopyParameters=Could not copy parameters when creating the new RM
Message. See the following exception for more details: {0}.
#-------------------------------------
#
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=446528&r1=446527&r2=446528
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Thu Sep 14 23:05:25 2006
@@ -20,10 +20,8 @@
import java.util.ArrayList;
import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.impl.llom.OMElementImpl;
import org.apache.axiom.soap.SOAPBody;
import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFactory;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
@@ -48,7 +46,6 @@
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
@@ -75,7 +72,6 @@
import org.apache.sandesha2.wsrm.LastMessage;
import org.apache.sandesha2.wsrm.MessageNumber;
import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.SequenceOffer;
import org.apache.sandesha2.wsrm.UsesSequenceSTR;
@@ -85,8 +81,6 @@
public class ApplicationMsgProcessor implements MsgProcessor {
- private boolean letInvoke = false;
-
private static final Log log =
LogFactory.getLog(ApplicationMsgProcessor.class);
public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
@@ -550,7 +544,7 @@
else if (systemMessageNumber > 0) { // if system message number
is valid
// use it.
messageNumber = systemMessageNumber + 1;
- } else { // This is the fist message (systemMessageNumber = -1)
+ } else { // This is the first message (systemMessageNumber = -1)
messageNumber = 1;
}
@@ -652,7 +646,7 @@
// send.
}
- // if fist message - setup the sending side sequence -
both for the
+ // if first message - setup the sending side sequence -
both for the
// server and the client sides
SequenceManager.setupNewClientSequence(msgContext,
sequencePropertyKey, specVersion, storageManager);
}
@@ -917,8 +911,6 @@
log.debug("Enter:
ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId);
MessageContext msg = rmMsg.getMessageContext();
- SOAPFactory factory =
SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(rmMsg.getSOAPEnvelope()));
- ConfigurationContext configurationContext =
rmMsg.getMessageContext().getConfigurationContext();
SequencePropertyBeanMgr sequencePropertyMgr =
storageManager.getSequencePropertyBeanMgr();
SenderBeanMgr retransmitterMgr =
storageManager.getRetransmitterBeanMgr();
@@ -984,7 +976,6 @@
msgNumber.setMessageNumber(messageNumber);
sequence.setMessageNumber(msgNumber);
- boolean lastMessage = false;
// setting last message
if (msg.isServerSide()) {
MessageContext requestMsg = null;
@@ -1005,9 +996,7 @@
}
if (requestSequence.getLastMessage() != null) {
- lastMessage = true;
sequence.setLastMessage(new
LastMessage(rmNamespaceValue));
-
}
} else {
@@ -1017,7 +1006,6 @@
if (operationContext != null) {
Object obj =
msg.getProperty(SandeshaClientConstants.LAST_MESSAGE);
if (obj != null && "true".equals(obj)) {
- lastMessage = true;
SequencePropertyBean specVersionBean =
sequencePropertyMgr.retrieve(internalSequenceId,
Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=446528&r1=446527&r2=446528
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
Thu Sep 14 23:05:25 2006
@@ -71,6 +71,8 @@
*/
public static void piggybackAcksIfPresent(RMMsgContext
rmMessageContext, StorageManager storageManager)
throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter:
AcknowledgementManager::piggybackAcksIfPresent");
ConfigurationContext configurationContext =
rmMessageContext.getConfigurationContext();
@@ -105,6 +107,8 @@
continue piggybackLoop;
}
+ if (log.isDebugEnabled()) log.debug("Adding ack
headers");
+
// deleting the ack entry.
retransmitterBeanMgr.delete(ackBean.getMessageID());
@@ -130,6 +134,9 @@
}
}
}
+
+ if (log.isDebugEnabled())
+ log.debug("Exit:
AcknowledgementManager::piggybackAcksIfPresent");
}
/**
@@ -143,6 +150,8 @@
*/
public static ArrayList getClientCompletedMessagesList(String
sequenceID, SequencePropertyBeanMgr seqPropMgr)
throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter:
AcknowledgementManager::getClientCompletedMessagesList");
// first trying to get it from the internal sequence id.
SequencePropertyBean internalSequenceBean =
seqPropMgr.retrieve(sequenceID,
@@ -165,14 +174,20 @@
completedMsgList =
SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
} else {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull,
sequenceID);
- throw new SandeshaException(message);
+ SandeshaException e = new SandeshaException(message);
+ if(log.isDebugEnabled()) log.debug("Throwing
exception", e);
+ throw e;
}
+ if (log.isDebugEnabled())
+ log.debug("Exit:
AcknowledgementManager::getClientCompletedMessagesList");
return completedMsgList;
}
public static ArrayList getServerCompletedMessagesList(String
sequenceID, SequencePropertyBeanMgr seqPropMgr)
throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter:
AcknowledgementManager::getServerCompletedMessagesList");
SequencePropertyBean completedMessagesBean = null;
@@ -184,14 +199,20 @@
completedMsgList =
SandeshaUtil.getArrayListFromMsgsString(completedMessagesBean.getValue());
} else {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.completedMsgBeanIsNull,
sequenceID);
- throw new SandeshaException(message);
+ SandeshaException e = new SandeshaException(message);
+ if(log.isDebugEnabled()) log.debug("Throwing
exception", e);
+ throw e;
}
+ if (log.isDebugEnabled())
+ log.debug("Exit:
AcknowledgementManager::getServerCompletedMessagesList");
return completedMsgList;
}
public static RMMsgContext generateAckMessage(RMMsgContext
referenceRMMessage, String sequencePropertyKey ,String sequenceId,
StorageManager storageManager) throws SandeshaException
{
+ if (log.isDebugEnabled())
+ log.debug("Enter:
AcknowledgementManager::generateAckMessage");
MessageContext referenceMsg =
referenceRMMessage.getMessageContext();
@@ -292,7 +313,6 @@
referenceRMMessage.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN,
"true");
ackRMMsgCtx.getMessageContext().setServerSide(true);
- return ackRMMsgCtx;
} else {
@@ -359,14 +379,20 @@
ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,
Sandesha2Constants.VALUE_TRUE);
ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, key);
ackMsgCtx.setTransportOut(new
Sandesha2TransportOutDesc());
- RMMsgContext ackRMMessageCtx =
MsgInitializer.initializeMessage(ackMsgCtx);
+ ackRMMsgCtx =
MsgInitializer.initializeMessage(ackMsgCtx);
SandeshaUtil.startSenderForTheSequence(configurationContext, sequenceId);
- return ackRMMessageCtx;
}
+
+ if (log.isDebugEnabled())
+ log.debug("Exit:
AcknowledgementManager::generateAckMessage");
+ return ackRMMsgCtx;
}
public static boolean verifySequenceCompletion(Iterator
ackRangesIterator, long lastMessageNo) {
+ if (log.isDebugEnabled())
+ log.debug("Enter:
AcknowledgementManager::verifySequenceCompletion");
+
HashMap startMap = new HashMap();
while (ackRangesIterator.hasNext()) {
@@ -375,21 +401,22 @@
}
long start = 1;
- boolean loop = true;
- while (loop) {
+ boolean result = false;
+ while (!result) {
AcknowledgementRange temp = (AcknowledgementRange)
startMap.get(new Long(start));
if (temp == null) {
- loop = false;
- continue;
+ break;
}
if (temp.getUpperValue() >= lastMessageNo)
- return true;
+ result = true;
start = temp.getUpperValue() + 1;
}
- return false;
+ if (log.isDebugEnabled())
+ log.debug("Enter:
AcknowledgementManager::verifySequenceCompletion " + result);
+ return result;
}
public static void addFinalAcknowledgement () {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=446528&r1=446527&r2=446528
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
Thu Sep 14 23:05:25 2006
@@ -130,8 +130,8 @@
}
} catch (AxisFault e) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotCopyParameters);
- log.error(message);
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotCopyParameters,
e.getLocalizedMessage());
+ log.error(message, e);
throw new SandeshaException(message, e);
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=446528&r1=446527&r2=446528
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
Thu Sep 14 23:05:25 2006
@@ -164,6 +164,10 @@
private void internalRun() {
if (log.isDebugEnabled())
log.debug("Enter: InOrderInvoker::internalRun");
+
+ // If this invoker is working for several sequences, we use
round-robin to
+ // try and give them all a chance to invoke messages.
+ int nextIndex = 0;
while (isInvokerStarted()) {
@@ -203,16 +207,15 @@
continue;
}
- //TODO pick the sequence randomly.
+ // Pick a sequence using a round-robin approach
ArrayList allSequencesList = SandeshaUtil
.getArrayListFromString(allSequencesBean.getValue());
-
-
int size = allSequencesList.size();
- Random r = new Random ();
- int index = r.nextInt(size);
-
- String sequenceId = (String)
allSequencesList.get(index);
+ if(nextIndex >= size) {
+ nextIndex = 0;
+ if (size == 0) continue;
+ }
+ String sequenceId = (String)
allSequencesList.get(nextIndex++);
NextMsgBean nextMsgBean =
nextMsgMgr.retrieve(sequenceId);
@@ -252,8 +255,7 @@
//check weather the bean is already
assigned to a worker.
if (lock.isWorkPresent(workId)) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned);
- message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned,
workId);
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned,
workId);
log.debug(message);
continue;
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=446528&r1=446527&r2=446528
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Thu Sep 14 23:05:25 2006
@@ -170,7 +170,6 @@
if (senderBean == null) {
if (log.isDebugEnabled()) {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
- message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
log.debug(message);
}
continue;
@@ -184,9 +183,10 @@
//check weather the bean is already assigned to
a worker.
if (lock.isWorkPresent(workId)) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned);
- message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned,
workId);
- log.debug(message);
+ if (log.isDebugEnabled()) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned,
workId);
+ log.debug(message);
+ }
continue;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]