Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java?rev=406396&r1=406395&r2=406396&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java Sun May 14 11:34:48 2006 @@ -203,7 +203,7 @@ */ public static RMMsgContext createCreateSeqMsg( RMMsgContext applicationRMMsg, String internalSequenceId, - String acksTo) throws SandeshaException { + String acksTo,StorageManager storageManager) throws SandeshaException { MessageContext applicationMsgContext = applicationRMMsg .getMessageContext(); @@ -217,8 +217,6 @@ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil .getSOAPVersion(applicationMsgContext.getEnvelope())); - StorageManager storageManager = SandeshaUtil - .getSandeshaStorageManager(context); SequencePropertyBeanMgr seqPropMgr = storageManager .getSequencePropretyBeanMgr(); MessageContext createSeqmsgContext; @@ -268,13 +266,13 @@ RMMsgContext createSeqRMMsg = new RMMsgContext(createSeqmsgContext); - String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,context); + String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,storageManager); if (rmVersion==null) throw new SandeshaException ("Cant find the rmVersion of the given message"); String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion); - String addressingNamespaceValue = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,context); + String addressingNamespaceValue = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager); CreateSequence createSequencePart = new CreateSequence(factory,rmNamespaceValue,addressingNamespaceValue); @@ -336,8 +334,8 @@ } - createSeqRMMsg.setAction(SpecSpecificConstants.getCreateSequenceAction(SandeshaUtil.getRMVersion(internalSequenceId,context))); - createSeqRMMsg.setSOAPAction(SpecSpecificConstants.getCreateSequenceSOAPAction(SandeshaUtil.getRMVersion(internalSequenceId,context))); + createSeqRMMsg.setAction(SpecSpecificConstants.getCreateSequenceAction(SandeshaUtil.getRMVersion(internalSequenceId,storageManager))); + createSeqRMMsg.setSOAPAction(SpecSpecificConstants.getCreateSequenceSOAPAction(SandeshaUtil.getRMVersion(internalSequenceId,storageManager))); finalizeCreation(applicationMsgContext, createSeqmsgContext); @@ -353,7 +351,7 @@ * @throws SandeshaException */ public static RMMsgContext createTerminateSequenceMessage( - RMMsgContext referenceRMMessage, String sequenceId, String internalSequenceID) + RMMsgContext referenceRMMessage, String sequenceId, String internalSequenceID,StorageManager storageManager) throws SandeshaException { MessageContext referenceMessage = referenceRMMessage .getMessageContext(); @@ -397,7 +395,7 @@ .getRemainingPhasesInFlow()); } - String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID,configCtx); + String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID,storageManager); if (rmVersion==null) throw new SandeshaException ("Cant find the rmVersion of the given message"); @@ -462,7 +460,7 @@ */ public static RMMsgContext createCreateSeqResponseMsg( RMMsgContext createSeqMessage, MessageContext outMessage, - String newSequenceID) throws AxisFault { + String newSequenceID,StorageManager storageManager) throws AxisFault { SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil .getSOAPVersion(createSeqMessage.getSOAPEnvelope())); @@ -473,12 +471,12 @@ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ); CreateSequence cs = (CreateSequence) messagePart; - String rmVersion = SandeshaUtil.getRMVersion(newSequenceID,configurationContext); + String rmVersion = SandeshaUtil.getRMVersion(newSequenceID,storageManager); if (rmVersion==null) throw new SandeshaException ("Cant find the rmVersion of the given message"); String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion); - String addressingNamespaceValue = SandeshaUtil.getSequenceProperty(newSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configurationContext); + String addressingNamespaceValue = SandeshaUtil.getSequenceProperty(newSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager); CreateSequenceResponse response = new CreateSequenceResponse(factory,rmNamespaceValue,addressingNamespaceValue); @@ -507,8 +505,8 @@ SOAPEnvelope envelope = factory.getDefaultEnvelope(); response.toOMElement(envelope.getBody()); - outMessage.setWSAAction(SpecSpecificConstants.getCreateSequenceResponseAction(SandeshaUtil.getRMVersion(newSequenceID,configurationContext))); - outMessage.setSoapAction(SpecSpecificConstants.getCreateSequenceResponseSOAPAction(SandeshaUtil.getRMVersion(newSequenceID,configurationContext))); + outMessage.setWSAAction(SpecSpecificConstants.getCreateSequenceResponseAction(SandeshaUtil.getRMVersion(newSequenceID,storageManager))); + outMessage.setSoapAction(SpecSpecificConstants.getCreateSequenceResponseSOAPAction(SandeshaUtil.getRMVersion(newSequenceID,storageManager))); outMessage.setProperty(AddressingConstants.WS_ADDRESSING_VERSION,addressingNamespaceValue); String newMessageId = SandeshaUtil.getUUID(); @@ -534,7 +532,7 @@ } - public static RMMsgContext createTerminateSeqResponseMsg (RMMsgContext terminateSeqRMMsg, MessageContext outMessage) throws SandeshaException { + public static RMMsgContext createTerminateSeqResponseMsg (RMMsgContext terminateSeqRMMsg, MessageContext outMessage,StorageManager storageManager) throws SandeshaException { RMMsgContext terminateSeqResponseRMMsg = new RMMsgContext (outMessage); ConfigurationContext configurationContext = terminateSeqRMMsg.getMessageContext().getConfigurationContext(); @@ -557,8 +555,8 @@ terminateSeqResponseRMMsg.setSOAPEnvelop(envelope); terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE,terminateSequenceResponse); - outMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceResponseAction(SandeshaUtil.getRMVersion(sequenceID,configurationContext))); - outMessage.setSoapAction(SpecSpecificConstants.getTerminateSequenceResponseAction(SandeshaUtil.getRMVersion(sequenceID,configurationContext))); + outMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceResponseAction(SandeshaUtil.getRMVersion(sequenceID,storageManager))); + outMessage.setSoapAction(SpecSpecificConstants.getTerminateSequenceResponseAction(SandeshaUtil.getRMVersion(sequenceID,storageManager))); initializeCreation(terminateSeqRMMsg.getMessageContext(),outMessage); @@ -572,7 +570,7 @@ } - public static RMMsgContext createCloseSeqResponseMsg (RMMsgContext closeSeqRMMsg, MessageContext outMessage) throws SandeshaException { + public static RMMsgContext createCloseSeqResponseMsg (RMMsgContext closeSeqRMMsg, MessageContext outMessage,StorageManager storageManager) throws SandeshaException { RMMsgContext closeSeqResponseRMMsg = new RMMsgContext (outMessage); ConfigurationContext configurationContext = closeSeqRMMsg.getMessageContext().getConfigurationContext(); @@ -595,8 +593,8 @@ closeSeqResponseRMMsg.setSOAPEnvelop(envelope); closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE_RESPONSE,closeSequenceResponse); - outMessage.setWSAAction(SpecSpecificConstants.getCloseSequenceResponseAction(SandeshaUtil.getRMVersion(sequenceID,configurationContext))); - outMessage.setSoapAction(SpecSpecificConstants.getCloseSequenceResponseAction(SandeshaUtil.getRMVersion(sequenceID,configurationContext))); + outMessage.setWSAAction(SpecSpecificConstants.getCloseSequenceResponseAction(SandeshaUtil.getRMVersion(sequenceID,storageManager))); + outMessage.setSoapAction(SpecSpecificConstants.getCloseSequenceResponseAction(SandeshaUtil.getRMVersion(sequenceID,storageManager))); initializeCreation(closeSeqRMMsg.getMessageContext(),outMessage); @@ -617,7 +615,7 @@ * @throws SandeshaException */ public static void addAckMessage(RMMsgContext applicationMsg, - String sequenceId) throws SandeshaException { + String sequenceId,StorageManager storageManager) throws SandeshaException { SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil .getSOAPVersion(applicationMsg.getSOAPEnvelope())); @@ -632,7 +630,7 @@ ConfigurationContext ctx = applicationMsg.getMessageContext() .getConfigurationContext(); - String rmVersion = SandeshaUtil.getRMVersion(sequenceId,ctx); + String rmVersion = SandeshaUtil.getRMVersion(sequenceId,storageManager); if (rmVersion==null) throw new SandeshaException ("Cant find the rmVersion of the given message"); @@ -644,9 +642,6 @@ id.setIndentifer(sequenceId); sequenceAck.setIdentifier(id); - - StorageManager storageManager = SandeshaUtil - .getSandeshaStorageManager(ctx); SequencePropertyBeanMgr seqPropMgr = storageManager .getSequencePropretyBeanMgr(); @@ -678,9 +673,9 @@ sequenceAck.toOMElement(envelope.getHeader()); applicationMsg - .setAction(SpecSpecificConstants.getSequenceAcknowledgementAction(SandeshaUtil.getRMVersion(sequenceId,ctx))); + .setAction(SpecSpecificConstants.getSequenceAcknowledgementAction(SandeshaUtil.getRMVersion(sequenceId,storageManager))); applicationMsg - .setSOAPAction(SpecSpecificConstants.getSequenceAcknowledgementSOAPAction(SandeshaUtil.getRMVersion(sequenceId,ctx))); + .setSOAPAction(SpecSpecificConstants.getSequenceAcknowledgementSOAPAction(SandeshaUtil.getRMVersion(sequenceId,storageManager))); applicationMsg.setMessageId(SandeshaUtil.getUUID()); } @@ -691,7 +686,7 @@ * @return * @throws SandeshaException */ - public static RMMsgContext createAckMessage(RMMsgContext relatedRMMessage, String sequenceID, String rmNamespaceValue) + public static RMMsgContext createAckMessage(RMMsgContext relatedRMMessage, String sequenceID, String rmNamespaceValue,StorageManager storageManager) throws SandeshaException { try { @@ -710,7 +705,7 @@ initializeCreation(applicationMsgCtx, ackMsgCtx); - addAckMessage(ackRMMsgCtx, sequenceID); + addAckMessage(ackRMMsgCtx, sequenceID,storageManager); ackMsgCtx.setProperty(MessageContext.TRANSPORT_IN,null);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java?rev=406396&r1=406395&r2=406396&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaPropertyBean.java Sun May 14 11:34:48 2006 @@ -30,10 +30,12 @@ public class SandeshaPropertyBean { RMPolicyBean policyBean = new RMPolicyBean (); - String storageManagerClass = null; +// String storageManagerClass = null; boolean inOrder = true; ArrayList msgTypesToDrop = null; - + private String inMemoryStorageManagerClass = null; + private String permanentStorageManagerClass = null; + public long getInactiveTimeoutInterval() { return policyBean.getInactiveTimeoutInterval(); } @@ -86,12 +88,28 @@ policyBean.setAcknowledgementInterval(acknowledgementInterval); } - public String getStorageManagerClass() { - return storageManagerClass; +// public String getStorageManagerClass() { +// return storageManagerClass; +// } +// +// public void setStorageManagerClass(String storageManagerClass) { +// this.storageManagerClass = storageManagerClass; +// } + + public String getInMemoryStorageManagerClass() { + return inMemoryStorageManagerClass; + } + + public void setInMemoryStorageManagerClass(String inMemoryStorageManagerClass) { + this.inMemoryStorageManagerClass = inMemoryStorageManagerClass; + } + + public String getPermanentStorageManagerClass() { + return permanentStorageManagerClass; } - public void setStorageManagerClass(String storageManagerClass) { - this.storageManagerClass = storageManagerClass; + public void setPermanentStorageManagerClass(String permanentStorageManagerClass) { + this.permanentStorageManagerClass = permanentStorageManagerClass; } public RMPolicyBean getPolicyBean () { Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=406396&r1=406395&r2=406396&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Sun May 14 11:34:48 2006 @@ -326,18 +326,56 @@ * @return * @throws SandeshaException */ - public static StorageManager getSandeshaStorageManager(ConfigurationContext context) throws SandeshaException { + public static StorageManager getSandeshaStorageManager(ConfigurationContext context,AxisDescription description) throws SandeshaException { - StorageManager storageManager = (StorageManager) context.getProperty(Sandesha2Constants.STORAGE_MANAGER); - if (storageManager != null) - return storageManager; + Parameter parameter = description.getParameter(Sandesha2Constants.STORAGE_MANAGER_PARAMETER); + if (parameter==null) + parameter = new Parameter (Sandesha2Constants.STORAGE_MANAGER_PARAMETER,Sandesha2Constants.DEFAULT_STORAGE_MANAGER); + + String value = (String) parameter.getValue(); + if (Sandesha2Constants.INMEMORY_STORAGE_MANAGER.equals(value)) + return getInMemoryStorageManager(context); + else if (Sandesha2Constants.PERMANENT_STORAGE_MANAGER.equals(value)) + return getPermanentStorageManager(context); + else + throw new SandeshaException ("Unknown StorageManager type. Please check your parameter value."); + } + + public static StorageManager getInMemoryStorageManager(ConfigurationContext context) throws SandeshaException { + + StorageManager inMemoryStorageManager = (StorageManager) context.getProperty(Sandesha2Constants.INMEMORY_STORAGE_MANAGER); + if (inMemoryStorageManager != null) + return inMemoryStorageManager; //Currently module policies (default) are used to find the storage manager. These cant be overriden //TODO change this so that different services can hv different storage managers. - String srotageManagerClassStr = getDefaultPropertyBean(context.getAxisConfiguration()).getStorageManagerClass(); + String storageManagerClassStr = getDefaultPropertyBean(context.getAxisConfiguration()).getInMemoryStorageManagerClass(); + inMemoryStorageManager = getStorageManagerInstance(storageManagerClassStr,context); + context.setProperty(Sandesha2Constants.INMEMORY_STORAGE_MANAGER,inMemoryStorageManager); + + return inMemoryStorageManager; + } + + public static StorageManager getPermanentStorageManager(ConfigurationContext context) throws SandeshaException { + + StorageManager permanentStorageManager = (StorageManager) context.getProperty(Sandesha2Constants.PERMANENT_STORAGE_MANAGER); + if (permanentStorageManager != null) + return permanentStorageManager; + //Currently module policies (default) are used to find the storage manager. These cant be overriden + //TODO change this so that different services can hv different storage managers. + String storageManagerClassStr = getDefaultPropertyBean(context.getAxisConfiguration()).getPermanentStorageManagerClass (); + permanentStorageManager = getStorageManagerInstance(storageManagerClassStr,context); + context.setProperty(Sandesha2Constants.PERMANENT_STORAGE_MANAGER,permanentStorageManager); + + return permanentStorageManager; + } + + private static StorageManager getStorageManagerInstance (String className,ConfigurationContext context) throws SandeshaException { + + StorageManager storageManager = null; try { - Class c = Class.forName(srotageManagerClassStr); + Class c = Class.forName(className); Class configContextClass = Class.forName(context.getClass().getName()); Constructor constructor = c.getConstructor(new Class[] { configContextClass }); Object obj = constructor.newInstance(new Object[] { context }); @@ -347,7 +385,6 @@ StorageManager mgr = (StorageManager) obj; storageManager = mgr; - context.setProperty(Sandesha2Constants.STORAGE_MANAGER,storageManager); return storageManager; } catch (Exception e) { @@ -643,9 +680,8 @@ } public static String getSequenceIDFromInternalSequenceID(String internalSequenceID, - ConfigurationContext configurationContext) throws SandeshaException { + StorageManager storageManager) throws SandeshaException { - StorageManager storageManager = getSandeshaStorageManager(configurationContext); SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean outSequenceBean = sequencePropertyBeanMgr.retrieve(internalSequenceID, @@ -771,9 +807,8 @@ * @return * @throws SandeshaException */ - public static String getRMVersion(String propertyKey, ConfigurationContext configurationContext) + public static String getRMVersion(String propertyKey, StorageManager storageManager) throws SandeshaException { - StorageManager storageManager = getSandeshaStorageManager(configurationContext); SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean specVersionBean = sequencePropertyBeanMgr.retrieve(propertyKey, @@ -785,9 +820,8 @@ return specVersionBean.getValue(); } - public static String getSequenceProperty(String id, String name, ConfigurationContext context) + public static String getSequenceProperty(String id, String name, StorageManager storageManager) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context); SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean sequencePropertyBean = sequencePropertyBeanMgr.retrieve(id, name); @@ -798,10 +832,10 @@ } public static boolean isAllMsgsAckedUpto(long highestInMsgNo, String internalSequenceID, - ConfigurationContext configCtx) throws SandeshaException { + StorageManager storageManager) throws SandeshaException { String clientCompletedMessages = getSequenceProperty(internalSequenceID, - Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES, configCtx); + Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES, storageManager); ArrayList ackedMsgsList = getArrayListFromString(clientCompletedMessages); long smallestMsgNo = 1; Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=406396&r1=406395&r2=406396&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Sun May 14 11:34:48 2006 @@ -45,7 +45,7 @@ private static Log log = LogFactory.getLog(SequenceManager.class); - public static String setupNewSequence(RMMsgContext createSequenceMsg) + public static String setupNewSequence(RMMsgContext createSequenceMsg,StorageManager storageManager) throws AxisFault { String sequenceId = SandeshaUtil.getUUID(); @@ -76,15 +76,9 @@ throw new AxisFault(message); } - StorageManager storageManager = null; ConfigurationContext configurationContext = createSequenceMsg.getMessageContext() .getConfigurationContext(); - try { - storageManager = SandeshaUtil - .getSandeshaStorageManager(configurationContext); - } catch (SandeshaException e) { - e.printStackTrace(); - } + SequencePropertyBeanMgr seqPropMgr = storageManager .getSequencePropretyBeanMgr(); @@ -164,15 +158,12 @@ } public static void setupNewClientSequence( - MessageContext firstAplicationMsgCtx, String internalSequenceId, String specVersion) + MessageContext firstAplicationMsgCtx, String internalSequenceId, String specVersion,StorageManager storageManager) throws SandeshaException { ConfigurationContext configurationContext = firstAplicationMsgCtx .getConfigurationContext(); - StorageManager storageManager = SandeshaUtil - .getSandeshaStorageManager(configurationContext); - SequencePropertyBeanMgr seqPropMgr = storageManager .getSequencePropretyBeanMgr(); @@ -308,7 +299,7 @@ seqPropMgr.insert(specVerionBean); //updating the last activated time. - updateLastActivatedTime(internalSequenceId,configurationContext); + updateLastActivatedTime(internalSequenceId,storageManager); SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId); @@ -358,8 +349,7 @@ * @param configContext * @throws SandeshaException */ - public static void updateLastActivatedTime (String propertyKey, ConfigurationContext configContext) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext); + public static void updateLastActivatedTime (String propertyKey, StorageManager storageManager) throws SandeshaException { //Transaction lastActivatedTransaction = storageManager.getTransaction(); SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr(); @@ -385,9 +375,8 @@ } - public static long getLastActivatedTime (String propertyKey, ConfigurationContext configContext) throws SandeshaException { + public static long getLastActivatedTime (String propertyKey, StorageManager storageManager) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext); SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(propertyKey,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME); @@ -401,7 +390,7 @@ return lastActivatedTime; } - public static boolean hasSequenceTimedOut (String propertyKey, RMMsgContext rmMsgCtx) throws SandeshaException { + public static boolean hasSequenceTimedOut (String propertyKey, RMMsgContext rmMsgCtx,StorageManager storageManager) throws SandeshaException { //operation is the lowest level, Sandesha2 could be engaged. SandeshaPropertyBean propertyBean = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation()); @@ -411,7 +400,7 @@ boolean sequenceTimedOut = false; - long lastActivatedTime = getLastActivatedTime(propertyKey,rmMsgCtx.getMessageContext().getConfigurationContext()); + long lastActivatedTime = getLastActivatedTime(propertyKey,storageManager); long timeNow = System.currentTimeMillis(); if (lastActivatedTime>0 && (lastActivatedTime+propertyBean.getInactiveTimeoutInterval()<timeNow)) sequenceTimedOut = true; @@ -419,8 +408,7 @@ return sequenceTimedOut; } - public static long getOutGoingSequenceAckedMessageCount (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext); + public static long getOutGoingSequenceAckedMessageCount (String internalSequenceID,StorageManager storageManager) throws SandeshaException { /// Transaction transaction = storageManager.getTransaction(); SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); @@ -454,8 +442,7 @@ return noOfMessagesAcked; } - public static boolean isOutGoingSequenceCompleted (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext); + public static boolean isOutGoingSequenceCompleted (String internalSequenceID,StorageManager storageManager) throws SandeshaException { /// Transaction transaction = storageManager.getTransaction(); SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); @@ -490,9 +477,8 @@ return false; } - public static boolean isIncomingSequenceCompleted (String sequenceID, ConfigurationContext configurationContext) throws SandeshaException { + public static boolean isIncomingSequenceCompleted (String sequenceID, StorageManager storageManager) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext); /// Transaction transaction = storageManager.getTransaction(); SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=406396&r1=406395&r2=406396&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java Sun May 14 11:34:48 2006 @@ -73,7 +73,7 @@ * @param sequenceID * @throws SandeshaException */ - public static void cleanReceivingSideOnTerminateMessage (ConfigurationContext configContext, String sequenceID) throws SandeshaException { + public static void cleanReceivingSideOnTerminateMessage (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException { //clean senderMap //Currently in-order invocation is done for default values. @@ -81,12 +81,12 @@ if(!inOrderInvocation) { //there is no invoking by Sandesha2. So clean invocations storages. - cleanReceivingSideAfterInvocation(configContext,sequenceID); + cleanReceivingSideAfterInvocation(configContext,sequenceID,storageManager); } String cleanStatus = (String) receivingSideCleanMap.get(sequenceID); if (cleanStatus!=null && CLEANED_AFTER_INVOCATION.equals(cleanStatus)) - completeTerminationOfReceivingSide(configContext,sequenceID); + completeTerminationOfReceivingSide(configContext,sequenceID,storageManager); else { receivingSideCleanMap.put(sequenceID,CLEANED_ON_TERMINATE_MSG); } @@ -100,8 +100,7 @@ * @param sequenceID * @throws SandeshaException */ - public static void cleanReceivingSideAfterInvocation (ConfigurationContext configContext, String sequenceID) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext); + public static void cleanReceivingSideAfterInvocation (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException { InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr(); //removing storageMap entries @@ -123,7 +122,7 @@ String cleanStatus = (String) receivingSideCleanMap.get(sequenceID); if (cleanStatus!=null && CLEANED_ON_TERMINATE_MSG.equals(cleanStatus)) - completeTerminationOfReceivingSide(configContext,sequenceID); + completeTerminationOfReceivingSide(configContext,sequenceID,storageManager); else { receivingSideCleanMap.put(sequenceID,CLEANED_AFTER_INVOCATION); } @@ -133,8 +132,7 @@ * This has to be called by the lastly invocated one of the above two methods. * */ - private static void completeTerminationOfReceivingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext); + private static void completeTerminationOfReceivingSide (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException { InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr(); NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr(); @@ -149,16 +147,15 @@ } //removing the HighestInMessage entry. - String highestInMessageKey = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,configContext); + String highestInMessageKey = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,storageManager); if (highestInMessageKey!=null) { storageManager.removeMessageContext(highestInMessageKey); } - removeReceivingSideProperties(configContext,sequenceID); + removeReceivingSideProperties(configContext,sequenceID,storageManager); } - private static void removeReceivingSideProperties (ConfigurationContext configContext, String sequenceID) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext); + private static void removeReceivingSideProperties (ConfigurationContext configContext, String sequenceID,StorageManager storageManager) throws SandeshaException { SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean allSequenceBean = sequencePropertyBeanMgr.retrieve(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST); @@ -193,16 +190,14 @@ * @param sequenceID * @throws SandeshaException */ - public static void terminateSendingSide (ConfigurationContext configContext, String internalSequenceID,boolean serverSide) throws SandeshaException { - - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext); + public static void terminateSendingSide (ConfigurationContext configContext, String internalSequenceID,boolean serverSide,StorageManager storageManager) throws SandeshaException { SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE); seqPropMgr.insert(seqTerminatedBean); - cleanSendingSideData(configContext,internalSequenceID,serverSide); + cleanSendingSideData(configContext,internalSequenceID,serverSide,storageManager); } private static void doUpdatesIfNeeded (String sequenceID, SequencePropertyBean propertyBean, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException { @@ -268,30 +263,28 @@ return deleatable; } - public static void timeOutSendingSideSequence (ConfigurationContext context,String internalSequenceID, boolean serverside) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context); + public static void timeOutSendingSideSequence (ConfigurationContext context,String internalSequenceID, boolean serverside,StorageManager storageManager) throws SandeshaException { SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT,Sandesha2Constants.VALUE_TRUE); seqPropMgr.insert(seqTerminatedBean); - cleanSendingSideData(context,internalSequenceID,serverside); + cleanSendingSideData(context,internalSequenceID,serverside,storageManager); } - private static void cleanSendingSideData (ConfigurationContext configContext,String internalSequenceID, boolean serverSide) throws SandeshaException { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext); + private static void cleanSendingSideData (ConfigurationContext configContext,String internalSequenceID, boolean serverSide,StorageManager storageManager) throws SandeshaException { SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr(); SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr(); CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr(); - String outSequenceID = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,configContext); + String outSequenceID = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,storageManager); if (!serverSide) { boolean stopListnerForAsyncAcks = false; SequencePropertyBean acksToBean = sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR); - String addressingNamespace = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configContext); + String addressingNamespace = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager); String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace); if (acksToBean!=null) { @@ -340,12 +333,10 @@ } public static void addTerminateSequenceMessage(RMMsgContext referenceMessage, - String outSequenceId, String internalSequenceId) + String outSequenceId, String internalSequenceId,StorageManager storageManager) throws SandeshaException { ConfigurationContext configurationContext = referenceMessage.getMessageContext().getConfigurationContext(); - StorageManager storageManager = SandeshaUtil - .getSandeshaStorageManager(configurationContext); /// Transaction addTerminateSeqTransaction = storageManager.getTransaction(); @@ -362,7 +353,7 @@ } RMMsgContext terminateRMMessage = RMMsgCreator - .createTerminateSequenceMessage(referenceMessage, outSequenceId,internalSequenceId); + .createTerminateSequenceMessage(referenceMessage, outSequenceId,internalSequenceId,storageManager); terminateRMMessage.setFlow(MessageContext.OUT_FLOW); terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true"); @@ -377,10 +368,10 @@ terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress())); - String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configurationContext); + String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager); String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI); - String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,configurationContext); + String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,storageManager); if (rmVersion==null) throw new SandeshaException ("Cant find the rmVersion of the given message"); terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion)); @@ -404,7 +395,6 @@ storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext()); - //Set a retransmitter lastSentTime so that terminate will be send with // some delay. @@ -423,8 +413,7 @@ terminateBean.setReSend(false); - SenderBeanMgr retramsmitterMgr = storageManager - .getRetransmitterBeanMgr(); + SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr(); retramsmitterMgr.insert(terminateBean); @@ -449,7 +438,6 @@ } catch (AxisFault e) { throw new SandeshaException (e.getMessage()); } - } } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=406396&r1=406395&r2=406396&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Sun May 14 11:34:48 2006 @@ -104,7 +104,7 @@ boolean rolebacked = false; try { - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context); + StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration()); NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr(); InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr(); @@ -196,7 +196,7 @@ Sequence sequence = (Sequence) rmMsg .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); if (sequence.getLastMessage() != null) { - TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId); + TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId,storageManager); //exit from current iteration. (since an entry was removed) break currentIteration; } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=406396&r1=406395&r2=406396&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sun May 14 11:34:48 2006 @@ -82,7 +82,7 @@ StorageManager storageManager = null; try { - storageManager = SandeshaUtil.getSandeshaStorageManager(context); + storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration()); } catch (SandeshaException e2) { // TODO Auto-generated catch block log.debug("ERROR: Could not start sender"); @@ -124,7 +124,7 @@ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE); MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster(); - boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context); + boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context,storageManager); if (!continueSending) { continue; } @@ -171,7 +171,7 @@ if (isAckPiggybackableMsgType(messageType) && !isAckAlreadyPiggybacked(rmMsgCtx)) { // piggybacking if an ack if available for the same sequence. //TODO do piggybacking based on wsa:To - AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx); + AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx,storageManager); } //sending the message @@ -225,8 +225,8 @@ String sequenceID = terminateSequence.getIdentifier().getIdentifier(); ConfigurationContext configContext = msgCtx.getConfigurationContext(); - String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configContext); - TerminateManager.terminateSendingSide(configContext,internalSequenceID, msgCtx.isServerSide()); + String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,storageManager); + TerminateManager.terminateSendingSide(configContext,internalSequenceID, msgCtx.isServerSide(),storageManager); } msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
