Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=diff&rev=469431&r1=469430&r2=469431 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Tue Oct 31 01:50:15 2006 @@ -31,6 +31,7 @@ import org.apache.sandesha2.client.SandeshaClientConstants; import org.apache.sandesha2.i18n.SandeshaMessageHelper; import org.apache.sandesha2.i18n.SandeshaMessageKeys; +import org.apache.sandesha2.msgprocessors.AckRequestedProcessor; import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor; import org.apache.sandesha2.msgprocessors.MsgProcessor; import org.apache.sandesha2.msgprocessors.MsgProcessorFactory; @@ -150,6 +151,12 @@ if (msgProcessor != null){ if(msgProcessor.processOutMessage(rmMsgCtx)){ + //the msg was paused + returnValue = InvocationResponse.SUSPEND; + } + } else if (messageType==Sandesha2Constants.MessageTypes.ACK_REQUEST) { + AckRequestedProcessor ackRequestedProcessor = new AckRequestedProcessor (); + if(ackRequestedProcessor.processOutgoingAckRequestMessage (rmMsgCtx)){ //the msg was paused returnValue = InvocationResponse.SUSPEND; }
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=469431&r1=469430&r2=469431 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Tue Oct 31 01:50:15 2006 @@ -248,4 +248,8 @@ public final static String invalidOfferNoResponseEndpoint = "invalidOfferNoResponseEndpoint"; public final static String invalidElementFoundWithinElement = "invalidElementFoundWithinElement"; + public final static String couldNotSendAckRequestSeqNotFound="couldNotSendAckRequestSeqNotFound"; + public final static String couldNotSendCloseResponse="couldNotSendCloseResponse"; + public final static String couldNotSendCloseSeqNotFound="couldNotSendCloseSeqNotFound"; + } Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=469431&r1=469430&r2=469431 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Tue Oct 31 01:50:15 2006 @@ -114,7 +114,10 @@ couldNotSendClose=Could not send the close sequence message due to error {0}. couldNotSendAck=Could not send the ack message on sequence {0} due to an exception: {1} couldNotSendTerminateResponse=Could not send the terminate sequence response due to exception {0}. +couldNotSendCloseResponse=Could not send the CloseSequenceResponse due to exception {0}. couldNotSendTerminateSeqNotFound=Internal sequenceID {0} was not found: cannot send the terminate message. +couldNotSendCloseSeqNotFound=Internal sequenceID {0} was not found: cannot send the CloseSequence message. +couldNotSendAckRequestSeqNotFound=Internal sequenceID {0} was not found: cannot send the AckRequest message. couldNotSendFault=Could not send the fault message due to an exception: {0} cannotSendAckRequestNotActive=Cannot send the ackRequest message since the sequence with internal ID {0} is not active. cannotSendAckRequestException=Could not send the ackRequest message on sequence {0} due to an exception: {1} Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=469431&r1=469430&r2=469431 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Tue Oct 31 01:50:15 2006 @@ -30,16 +30,22 @@ import org.apache.axis2.Constants; import org.apache.axis2.addressing.AddressingConstants; import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.client.Options; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; +import org.apache.axis2.context.MessageContextConstants; import org.apache.axis2.context.OperationContext; +import org.apache.axis2.context.OperationContextFactory; import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.description.OutInAxisOperation; import org.apache.axis2.engine.AxisEngine; +import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sandesha2.RMMsgContext; import org.apache.sandesha2.Sandesha2Constants; import org.apache.sandesha2.SandeshaException; +import org.apache.sandesha2.client.SandeshaClientConstants; import org.apache.sandesha2.i18n.SandeshaMessageHelper; import org.apache.sandesha2.i18n.SandeshaMessageKeys; import org.apache.sandesha2.policy.SandeshaPolicyBean; @@ -285,6 +291,160 @@ return true; } return false; + } + + /** + * This is used to capture AckRequest messages send by the SandeshaClient. + * This will send that message using the Sandesha2 Sender. + * + * @param rmMsgContext + */ + public boolean processOutgoingAckRequestMessage (RMMsgContext ackRequestRMMsg) throws AxisFault { + + if (log.isDebugEnabled()) + log.debug("Enter: AckRequestedProcessor::processOutgoingAckRequestMessage"); + + MessageContext msgContext = ackRequestRMMsg.getMessageContext(); + ConfigurationContext configurationContext = msgContext.getConfigurationContext(); + Options options = msgContext.getOptions(); + + StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext, + configurationContext.getAxisConfiguration()); + + SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr(); + + String toAddress = ackRequestRMMsg.getTo().getAddress(); + String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY); + String internalSeqenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey); + + String outSequenceID = SandeshaUtil.getSequenceProperty(internalSeqenceID, + Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager); + if (outSequenceID == null) + throw new SandeshaException(SandeshaMessageHelper.getMessage( + SandeshaMessageKeys.couldNotSendAckRequestSeqNotFound, internalSeqenceID)); + + + // registring an InOutOperationContext for this. + // since the serviceContext.fireAndForget only sets a inOnly One + // this does not work when there is a terminateSequnceResponse + // TODO do processing of terminateMessagesCorrectly., create a new + // message instead of sendign the one given by the serviceClient + // TODO important + + AxisOperation outInAxisOp = new OutInAxisOperation(new QName("temp")); + + AxisOperation referenceInOutOperation = msgContext.getAxisService() + .getOperation( + new QName(Sandesha2Constants.RM_IN_OUT_OPERATION_NAME)); + if (referenceInOutOperation == null) { + String messge = "Cant find the recerence RM InOut operation"; + throw new SandeshaException(messge); + } + + outInAxisOp.setParent(msgContext.getAxisService()); + // setting flows + // outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation.getRemainingPhasesInFlow()); + outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation + .getRemainingPhasesInFlow()); + + OperationContext opcontext = OperationContextFactory + .createOperationContext( + WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, outInAxisOp); + opcontext.setParent(msgContext.getServiceContext()); + configurationContext.registerOperationContext(ackRequestRMMsg.getMessageId(), + opcontext); + + msgContext.setOperationContext(opcontext); + msgContext.setAxisOperation(outInAxisOp); + + Iterator iterator = ackRequestRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST); + + AckRequested ackRequested = null; + while (iterator.hasNext()) { + ackRequested = (AckRequested) iterator.next(); + } + + if (iterator.hasNext()) { + String message = "Passed message has more than one AckRequest. You can have only one"; + throw new SandeshaException (message); + } + + if (ackRequested==null) { + String message = "No AckRequested part was present in the message"; + throw new SandeshaException (message); + } + + ackRequested.getIdentifier().setIndentifer(outSequenceID); + + ackRequestRMMsg.setFlow(MessageContext.OUT_FLOW); + msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true"); + + ackRequestRMMsg.setTo(new EndpointReference(toAddress)); + + String rmVersion = SandeshaUtil.getRMVersion(internalSeqenceID, storageManager); + if (rmVersion == null) + throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion)); + + ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (rmVersion)); + ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (rmVersion)); + + String transportTo = SandeshaUtil.getSequenceProperty(internalSeqenceID, + Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager); + if (transportTo != null) { + ackRequestRMMsg.setProperty(MessageContextConstants.TRANSPORT_URL, transportTo); + } + + + //setting msg context properties + ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID); + ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSeqenceID); + ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey); + + try { + ackRequestRMMsg.addSOAPEnvelope(); + } catch (AxisFault e) { + throw new SandeshaException(e.getMessage(),e); + } + + String key = SandeshaUtil.getUUID(); + + SenderBean ackRequestBean = new SenderBean(); + ackRequestBean.setMessageContextRefKey(key); + + storageManager.storeMessageContext(key, msgContext); + + // Set a retransmitter lastSentTime so that terminate will be send with + // some delay. + // Otherwise this get send before return of the current request (ack). + // TODO: refine the terminate delay. + ackRequestBean.setTimeToSend(System.currentTimeMillis()); + + ackRequestBean.setMessageID(msgContext.getMessageID()); + + EndpointReference to = msgContext.getTo(); + if (to!=null) + ackRequestBean.setToAddress(to.getAddress()); + + // this will be set to true at the sender. + ackRequestBean.setSend(true); + + msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE); + + ackRequestBean.setReSend(false); + + SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr(); + + retramsmitterMgr.insert(ackRequestBean); + + ackRequestRMMsg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE); + + SandeshaUtil.executeAndStore(ackRequestRMMsg, key); + + if (log.isDebugEnabled()) + log.debug("Exit: AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.FALSE); + + return true; + } } Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=469431&r1=469430&r2=469431 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Tue Oct 31 01:50:15 2006 @@ -19,33 +19,50 @@ import java.util.Iterator; +import javax.xml.namespace.QName; + import org.apache.axiom.om.OMElement; import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axiom.soap.SOAPFactory; import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.client.Options; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; +import org.apache.axis2.context.MessageContextConstants; +import org.apache.axis2.context.OperationContext; +import org.apache.axis2.context.OperationContextFactory; +import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.description.OutInAxisOperation; import org.apache.axis2.engine.AxisEngine; import org.apache.axis2.util.Utils; +import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sandesha2.RMMsgContext; import org.apache.sandesha2.Sandesha2Constants; import org.apache.sandesha2.SandeshaException; +import org.apache.sandesha2.client.SandeshaClientConstants; import org.apache.sandesha2.i18n.SandeshaMessageHelper; import org.apache.sandesha2.i18n.SandeshaMessageKeys; +import org.apache.sandesha2.msgreceivers.RMMessageReceiver; import org.apache.sandesha2.security.SecurityManager; import org.apache.sandesha2.security.SecurityToken; import org.apache.sandesha2.storage.StorageManager; +import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr; import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr; +import org.apache.sandesha2.storage.beans.SenderBean; import org.apache.sandesha2.storage.beans.SequencePropertyBean; import org.apache.sandesha2.util.AcknowledgementManager; import org.apache.sandesha2.util.FaultManager; import org.apache.sandesha2.util.RMMsgCreator; import org.apache.sandesha2.util.SOAPAbstractFactory; import org.apache.sandesha2.util.SandeshaUtil; +import org.apache.sandesha2.util.SpecSpecificConstants; import org.apache.sandesha2.wsrm.CloseSequence; +import org.apache.sandesha2.wsrm.Identifier; import org.apache.sandesha2.wsrm.SequenceAcknowledgement; +import org.apache.sandesha2.wsrm.TerminateSequence; /** * Responsible for processing an incoming Close Sequence message. (As introduced @@ -148,7 +165,7 @@ try { engine.send(closeSequenceResponseMsg); } catch (AxisFault e) { - String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse, + String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendCloseResponse, sequenceId, e.toString()); throw new SandeshaException(message, e); } @@ -158,12 +175,142 @@ return false; } - public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException { + public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault { + if (log.isDebugEnabled()) { log.debug("Enter: CloseSequenceProcessor::processOutMessage"); log.debug("Exit: CloseSequenceProcessor::processOutMessage " + Boolean.FALSE); } - return false; + + MessageContext msgContext = rmMsgCtx.getMessageContext(); + ConfigurationContext configurationContext = msgContext.getConfigurationContext(); + Options options = msgContext.getOptions(); + + StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext, + configurationContext.getAxisConfiguration()); + + SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr(); + + String toAddress = rmMsgCtx.getTo().getAddress(); + String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY); + String internalSeqenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey); + + String outSequenceID = SandeshaUtil.getSequenceProperty(internalSeqenceID, + Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager); + if (outSequenceID == null) + throw new SandeshaException(SandeshaMessageHelper.getMessage( + SandeshaMessageKeys.couldNotSendCloseSeqNotFound, internalSeqenceID)); + + + // registring an InOutOperationContext for this. + // since the serviceContext.fireAndForget only sets a inOnly One + // this does not work when there is a closeSequnceResponse + // TODO do processing of closeMessagesCorrectly., create a new + // message instead of sendign the one given by the serviceClient + // TODO important + + AxisOperation outInAxisOp = new OutInAxisOperation(new QName("temp")); + + AxisOperation referenceInOutOperation = msgContext.getAxisService() + .getOperation( + new QName(Sandesha2Constants.RM_IN_OUT_OPERATION_NAME)); + if (referenceInOutOperation == null) { + String messge = "Cant find the recerence RM InOut operation"; + throw new SandeshaException(messge); + } + + outInAxisOp.setParent(msgContext.getAxisService()); + // setting flows + // outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation.getRemainingPhasesInFlow()); + outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation + .getRemainingPhasesInFlow()); + + outInAxisOp.setMessageReceiver(new RMMessageReceiver ()); + + OperationContext opcontext = OperationContextFactory + .createOperationContext( + WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, outInAxisOp); + opcontext.setParent(msgContext.getServiceContext()); + configurationContext.registerOperationContext(rmMsgCtx.getMessageId(), + opcontext); + + msgContext.setOperationContext(opcontext); + msgContext.setAxisOperation(outInAxisOp); + + CloseSequence closeSequencePart = (CloseSequence) rmMsgCtx + .getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE); + Identifier identifier = closeSequencePart.getIdentifier(); + if (identifier==null) { + identifier = new Identifier (closeSequencePart.getNamespaceValue()); + closeSequencePart.setIdentifier(identifier); + } + + identifier.setIndentifer(outSequenceID); + + rmMsgCtx.setFlow(MessageContext.OUT_FLOW); + msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true"); + + rmMsgCtx.setTo(new EndpointReference(toAddress)); + + String rmVersion = SandeshaUtil.getRMVersion(internalSeqenceID, storageManager); + if (rmVersion == null) + throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion)); + + rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(rmVersion)); + rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (rmVersion)); + + String transportTo = SandeshaUtil.getSequenceProperty(internalSeqenceID, + Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager); + if (transportTo != null) { + rmMsgCtx.setProperty(MessageContextConstants.TRANSPORT_URL, transportTo); + } + + //setting msg context properties + rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID); + rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSeqenceID); + rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY , sequenceKey); + + try { + rmMsgCtx.addSOAPEnvelope(); + } catch (AxisFault e) { + throw new SandeshaException(e.getMessage(),e); + } + + String key = SandeshaUtil.getUUID(); + + SenderBean closeBean = new SenderBean(); + closeBean.setMessageContextRefKey(key); + + storageManager.storeMessageContext(key, msgContext); + + closeBean.setTimeToSend(System.currentTimeMillis()); + + closeBean.setMessageID(msgContext.getMessageID()); + + EndpointReference to = msgContext.getTo(); + if (to!=null) + closeBean.setToAddress(to.getAddress()); + + // this will be set to true at the sender. + closeBean.setSend(true); + + msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE); + + closeBean.setReSend(false); + + SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr(); + + retramsmitterMgr.insert(closeBean); + + + rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE); + + SandeshaUtil.executeAndStore(rmMsgCtx, key); + + if (log.isDebugEnabled()) + log.debug("Exit: CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE); + + return true; } Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckRequested.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckRequested.java?view=diff&rev=469431&r1=469430&r2=469431 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckRequested.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/AckRequested.java Tue Oct 31 01:50:15 2006 @@ -17,6 +17,8 @@ package org.apache.sandesha2.wsrm; +import java.util.Iterator; + import javax.xml.namespace.QName; import org.apache.axiom.om.OMElement; @@ -79,6 +81,21 @@ OMFactory factory = header.getOMFactory(); OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM); + Iterator iter = header.getChildrenWithName(new QName (namespaceValue,Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED)); + while (iter.hasNext()) { + OMElement ackRequestedElement = (OMElement) iter.next(); + + OMElement identifierElement = ackRequestedElement.getFirstChildWithName(new QName (namespaceValue, + Sandesha2Constants.WSRM_COMMON.IDENTIFIER)); + String identifierVal = null; + if (identifierElement!=null) + identifierVal = identifierElement.getText(); + + if (identifierVal!=null && identifierVal.equals(identifier.getIdentifier())) + ackRequestedElement.detach(); + + } + SOAPHeader SOAPHdr = (SOAPHeader) header; SOAPHeaderBlock ackReqHdrBlock = SOAPHdr.addHeaderBlock(Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED, rmNamespace); ackReqHdrBlock.setMustUnderstand(isMustUnderstand()); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
