Author: mlovett
Date: Wed Feb 7 02:45:23 2007
New Revision: 504494
URL: http://svn.apache.org/viewvc?view=rev&rev=504494
Log:
Make the Sender take each sequence in turn, to avoid starving them
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
Wed Feb 7 02:45:23 2007
@@ -263,8 +263,6 @@
// inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
-
SandeshaUtil.startSenderForTheSequence(configurationContext, sequenceId);
-
msgContext.pause();
if (log.isDebugEnabled())
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Wed Feb 7 02:45:23 2007
@@ -217,7 +217,7 @@
if (rMDBean!=null && rMDBean.isPollingMode()) {
PollingManager manager =
storageManager.getPollingManager();
-
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+ if(manager != null)
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Wed Feb 7 02:45:23 2007
@@ -180,8 +180,6 @@
if (internalSequenceId!=null)
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
- String sequencePropertyKey =
SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
/*
* checking weather the user has given the messageNumber (most
of the
* cases this will not be the case where the system will
generate the
@@ -284,7 +282,7 @@
// if first message - setup the sending side sequence -
both for the
// server and the client sides
- rmsBean =
SequenceManager.setupNewClientSequence(msgContext, sequencePropertyKey,
specVersion, storageManager);
+ rmsBean =
SequenceManager.setupNewClientSequence(msgContext, specVersion, storageManager);
EndpointReference acksToEPR = null;
@@ -331,32 +329,29 @@
if (transportIn == null)
transportIn =
org.apache.axis2.Constants.TRANSPORT_HTTP;
} else if (acksToEPR == null && serverSide) {
-// String incomingSequencId = SandeshaUtil
-//
.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
-
try {
MessageContext requestMsgContext =
operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
RMMsgContext requestRMMsgContext =
MsgInitializer.initializeMessage(requestMsgContext);
-
- String requestSideSequencePropertyKey =
SandeshaUtil.getSequencePropertyKey(requestRMMsgContext);
- RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager,
requestSideSequencePropertyKey);
-
- if (rmdBean != null &&
rmdBean.getReplyToEPR() != null) {
- String beanAcksToValue =
rmdBean.getReplyToEPR();
- if (beanAcksToValue != null)
- acksToEPR = new
EndpointReference(beanAcksToValue);
+ Sequence sequence = (Sequence)
requestRMMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if(sequence != null) {
+ String id =
sequence.getIdentifier().getIdentifier();
+ RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, id);
+
+ if (rmdBean != null &&
rmdBean.getReplyToEPR() != null) {
+ String beanAcksToValue
= rmdBean.getReplyToEPR();
+ if (beanAcksToValue !=
null)
+ acksToEPR = new
EndpointReference(beanAcksToValue);
+ }
}
} catch (AxisFault e) {
throw new SandeshaException (e);
}
}
- rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean,
sequencePropertyKey ,internalSequenceId, acksToEPR, storageManager);
+ rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean,
internalSequenceId, acksToEPR, storageManager);
}
if (rmsBean == null) {
- RMSBean findBean = new RMSBean();
- findBean.setInternalSequenceID(internalSequenceId);
- rmsBean =
storageManager.getRMSBeanMgr().findUnique(findBean);
+ rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
}
// the message number that was last used.
@@ -396,11 +391,6 @@
if (!dummyMessage)
rmsBean.setNextMessageNumber(messageNumber);
- if (messageNumber == 1 && !sendCreateSequence) {
- // Start the sender for the service side.
- SandeshaUtil.startSenderForTheSequence(configContext,
outSequenceID);
- }
-
RelatesTo relatesTo = msgContext.getRelatesTo();
if(relatesTo != null) {
rmsBean.setHighestOutRelatesTo(relatesTo.getValue());
@@ -465,7 +455,7 @@
return true;
}
- private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg,
RMSBean rmsBean, String sequencePropertyKey, String internalSequenceId,
EndpointReference acksTo,
+ private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg,
RMSBean rmsBean, String internalSequenceId, EndpointReference acksTo,
StorageManager storageManager) throws AxisFault {
if (log.isDebugEnabled())
@@ -543,7 +533,7 @@
createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
createSeqEntry.setTimeToSend(System.currentTimeMillis());
createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
- createSeqEntry.setInternalSequenceID(sequencePropertyKey);
+
createSeqEntry.setInternalSequenceID(rmsBean.getInternalSequenceID());
// this will be set to true in the sender
createSeqEntry.setSend(true);
// Indicate that this message is a create sequence
@@ -558,6 +548,9 @@
retransmitterMgr.insert(createSeqEntry);
+ // Setup enough of the workers to get this create sequence off
the box.
+ SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+
if (log.isDebugEnabled())
log.debug("Exit:
ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
return rmsBean;
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
Wed Feb 7 02:45:23 2007
@@ -187,9 +187,7 @@
rmsBeanMgr.insert(rMSBean);
- if(rMSBean.isPollingMode()) {
-
SandeshaUtil.startPollingForTheSequence(context, rmdBean.getSequenceID(),
false);
- }
+
SandeshaUtil.startWorkersForSequence(context, rMSBean);
} else {
// removing the accept part.
@@ -215,9 +213,7 @@
storageManager.getRMDBeanMgr().update(rmdBean);
- if(rmdBean.isPollingMode()) {
-
SandeshaUtil.startPollingForTheSequence(context, rmdBean.getSequenceID(),
false);
- }
+ SandeshaUtil.startWorkersForSequence(context, rmdBean);
AxisEngine engine = new AxisEngine(context);
try{
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Wed Feb 7 02:45:23 2007
@@ -210,18 +210,12 @@
rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
rmdBeanMgr.insert(rMDBean);
-
- if(rMDBean.isPollingMode()) {
-
SandeshaUtil.startPollingForTheSequence(configCtx, rMDBean.getSequenceID(),
false);
- }
+ SandeshaUtil.startWorkersForSequence(configCtx,
rMDBean);
}
rmsBean.setLastActivatedTime(System.currentTimeMillis());
rmsBeanMgr.update(rmsBean);
-
- if(rmsBean.isPollingMode()) {
- SandeshaUtil.startPollingForTheSequence(configCtx,
rmsBean.getSequenceID(), true);
- }
+ SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
// Locate and update all of the messages for this sequence, now
that we know
// the sequence id.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
Wed Feb 7 02:45:23 2007
@@ -38,7 +38,7 @@
ConfigurationContext context =
rmMsgContext.getConfigurationContext();
StorageManager storage =
SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
PollingManager pollingManager =
storage.getPollingManager();
-
pollingManager.schedulePollingRequest(sequenceId, false);
+ if(pollingManager != null)
pollingManager.schedulePollingRequest(sequenceId, false);
}
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Wed Feb 7 02:45:23 2007
@@ -53,6 +53,7 @@
import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.workers.SandeshaThread;
import org.apache.sandesha2.wsrm.Sequence;
/**
@@ -283,11 +284,6 @@
}
}
- // inorder invocation is still a global property
- boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
-
msgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
-
-
//setting properties for the messageContext
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new
Long (msgNo));
@@ -328,7 +324,10 @@
AcknowledgementManager.addAckBeanEntry(ackRMMsgContext,
sequenceId, timeToSend, storageManager);
}
- if (inOrderInvocation) {
+ // If the storage manager has an invoker, then they may be
implementing inOrder, or
+ // transactional delivery. Either way, if they have one we
should use it.
+ SandeshaThread invoker = storageManager.getInvoker();
+ if (invoker != null) {
// Whatever the MEP, we stop processing here and the
invoker will do the real work. We only
// SUSPEND if we need to keep the backchannel open for
the response... we may as well ABORT
// to let other cases end more quickly.
@@ -345,8 +344,6 @@
// This will avoid performing application processing
more than once.
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
- // Starting the invoker if stopped.
-
SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(),
sequenceId);
}
if (log.isDebugEnabled())
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Wed Feb 7 02:45:23 2007
@@ -87,8 +87,6 @@
throw new SandeshaException(message);
}
- String sequencePropertyKey =
SandeshaUtil.getSequencePropertyKey(terminateSeqRMMsg);
-
ConfigurationContext context =
terminateSeqMsg.getConfigurationContext();
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
@@ -110,9 +108,9 @@
// add the terminate sequence response if required.
RMMsgContext terminateSequenceResponse = null;
if
(SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
- terminateSequenceResponse =
getTerminateSequenceResponse(terminateSeqRMMsg, rmdBean, sequencePropertyKey,
sequenceId, storageManager);
+ terminateSequenceResponse =
getTerminateSequenceResponse(terminateSeqRMMsg, rmdBean, sequenceId,
storageManager);
- setUpHighestMsgNumbers(context,
storageManager,sequencePropertyKey, sequenceId, terminateSeqRMMsg);
+ setUpHighestMsgNumbers(context, storageManager, sequenceId,
terminateSeqRMMsg);
@@ -139,10 +137,10 @@
}
if (doFullTermination) {
-
TerminateManager.cleanReceivingSideAfterInvocation(context,
sequencePropertyKey, sequenceId, storageManager);
-
TerminateManager.cleanReceivingSideOnTerminateMessage(context,
sequencePropertyKey, sequenceId, storageManager);
+
TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId,
storageManager);
+
TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId,
storageManager);
} else
-
TerminateManager.cleanReceivingSideOnTerminateMessage(context,
sequencePropertyKey, sequenceId, storageManager);
+
TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId,
storageManager);
@@ -223,7 +221,7 @@
}
private void setUpHighestMsgNumbers(ConfigurationContext configCtx,
StorageManager storageManager,
- String requestSidesequencePropertyKey, String
sequenceId, RMMsgContext terminateRMMsg) throws SandeshaException {
+ String sequenceId, RMMsgContext terminateRMMsg) throws
SandeshaException {
if (log.isDebugEnabled())
log.debug("Enter:
TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
@@ -293,7 +291,7 @@
log.debug("Exit:
TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
}
- private RMMsgContext getTerminateSequenceResponse(RMMsgContext
terminateSeqRMMsg, RMDBean rmdBean, String sequencePropertyKey,String
sequenceId,
+ private RMMsgContext getTerminateSequenceResponse(RMMsgContext
terminateSeqRMMsg, RMDBean rmdBean, String sequenceId,
StorageManager storageManager) throws AxisFault {
if (log.isDebugEnabled())
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
Wed Feb 7 02:45:23 2007
@@ -75,7 +75,7 @@
if (rMDBean!=null && rMDBean.isPollingMode()) {
PollingManager manager =
storageManager.getPollingManager();
-
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+ if(manager != null)
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
Wed Feb 7 02:45:23 2007
@@ -120,7 +120,7 @@
RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
RMSBean findRMS = new RMSBean();
- findRMS.setSequenceID(entry.getSequenceId());
+ findRMS.setInternalSequenceID(entry.getSequenceId());
findRMS.setPollingMode(true);
findRMS.setTerminated(false);
RMSBean beanToPoll = rmsBeanManager.findUnique(findRMS);
@@ -192,7 +192,7 @@
makeConnectionSenderBean.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
makeConnectionSenderBean.setReSend(false);
makeConnectionSenderBean.setSend(true);
- makeConnectionSenderBean.setSequenceID(sequenceId);
+ makeConnectionSenderBean.setSequenceID(rmBean.getSequenceID());
EndpointReference to = makeConnectionRMMessage.getTo();
if (to!=null)
makeConnectionSenderBean.setToAddress(to.getAddress());
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
Wed Feb 7 02:45:23 2007
@@ -52,8 +52,12 @@
public void shutdown(){
//shutdown the running threads
getSender().stopRunning();
- getInvoker().stopRunning();
- getPollingManager().stopRunning();
+
+ SandeshaThread thread = getInvoker();
+ if(thread != null) thread.stopRunning();
+
+ thread = getPollingManager();
+ if(thread != null) thread.stopRunning();
}
public abstract void initStorage (AxisModule moduleDesc) throws
SandeshaStorageException;
@@ -62,8 +66,20 @@
public abstract SandeshaThread getSender();
+ /**
+ * Get the invoker that hands inbound messages over to the application.
This
+ * may be null, in which case the inbound messages will be dispatched
directly
+ * to the application without switching them over to the invoker.
+ * @return null if messages should be delivered directly to the
application,
+ * otherwise return a SandeshaThread.
+ */
public abstract SandeshaThread getInvoker();
+ /**
+ * Get the thread that generates polling requests to send to remote
endpoints.
+ * This may be null, in which case the storage manager does not support
polling.
+ * @return null if polling is diabled, otherwise return a
PollingManager.
+ */
public abstract PollingManager getPollingManager();
public abstract RMSBeanMgr getRMSBeanMgr();
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
Wed Feb 7 02:45:23 2007
@@ -40,7 +40,7 @@
public SenderBean findUnique (SenderBean bean) throws SandeshaException;
- public SenderBean getNextMsgToSend() throws SandeshaStorageException;
+ public SenderBean getNextMsgToSend(String sequenceId) throws
SandeshaStorageException;
public boolean update(SenderBean bean) throws SandeshaStorageException;
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
Wed Feb 7 02:45:23 2007
@@ -66,20 +66,19 @@
return super.find(bean);
}
- //TODO remove this method, and move this logic out of the
StorageManager. We should not hv any RM logic inside the StorageManagers.
- //Otherwise we will hv to repeat that logic inside every SM Impl.
- public SenderBean getNextMsgToSend() throws SandeshaStorageException {
+ public SenderBean getNextMsgToSend(String sequenceId) throws
SandeshaStorageException {
+ if(log.isDebugEnabled()) log.debug("Entry:
InMemorySenderBeanManager::getNextMessageToSend " + sequenceId);
+
// Set up match criteria
SenderBean matcher = new SenderBean();
matcher.setSend(true);
+ matcher.setSequenceID(sequenceId);
matcher.setTimeToSend(System.currentTimeMillis());
List matches = super.find(matcher);
+ if(log.isDebugEnabled()) log.debug("Found " + matches.size() +
" messages");
- // We either return an application message or an RM message. If
we find
- // an application message first then we carry on through the
list to be
- // sure that we send the lowest app message avaliable. If we
hit a RM
- // message first then we are done.
+ // Look for the message with the lowest send time, and send
that one.
SenderBean result = null;
Iterator i = matches.iterator();
while(i.hasNext()) {
@@ -87,20 +86,17 @@
if (bean.getTimeToSend()<0)
continue; //Beans with negative timeToSend
values are not considered as candidates for sending.
- if(bean.getMessageType() ==
Sandesha2Constants.MessageTypes.APPLICATION) {
- long number = bean.getMessageNumber();
- if(result == null || result.getMessageNumber()
> number) {
- result = bean;
- }
- } else if(result == null) {
- //making sure that the bean passes the reSend
test as well
- if (bean.getSentCount()==0 ||
(bean.getSentCount()>0 && bean.isReSend())) {
- result = bean;
- break;
- }
+ if (bean.getSentCount() > 0 && !bean.isReSend())
+ continue; //Avoid re-sending messages that we
should not resend
+
+ if(result == null) {
+ result = bean;
+ } else if(result.getTimeToSend() >
bean.getTimeToSend()) {
+ result = bean;
}
}
+ if(log.isDebugEnabled()) log.debug("Exit:
InMemorySenderBeanManager::getNextMessageToSend");
return result;
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
Wed Feb 7 02:45:23 2007
@@ -75,14 +75,20 @@
SandeshaPolicyBean policy =
SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
useSerialization = policy.isUseMessageSerialization();
-
+
+ // Note that while inOrder is a global property we can decide
if we need the
+ // invoker thread at this point. If we change this to be a
sequence-level
+ // property then we'll need to revisit this.
+ boolean inOrder = policy.isInOrder();
+ boolean polling = policy.isEnableMakeConnection();
+
this.rMSBeanMgr = new InMemoryRMSBeanMgr (this, context);
this.rMDBeanMgr = new InMemoryRMDBeanMgr (this, context);
this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this,
context);
this.sender = new Sender();
- this.invoker = new Invoker();
- this.pollingManager = new PollingManager();
+ if(inOrder) this.invoker = new Invoker();
+ if(polling) this.pollingManager = new PollingManager();
}
public Transaction getTransaction() {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
Wed Feb 7 02:45:23 2007
@@ -28,6 +28,7 @@
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beans.RMBean;
+import org.apache.sandesha2.workers.SandeshaThread;
/**
* This class does not really implement transactions, but it is a good
@@ -57,7 +58,10 @@
public void commit() {
releaseLocks();
if(sentMessages) manager.getSender().wakeThread();
- if(receivedMessages) manager.getInvoker().wakeThread();
+ if(receivedMessages) {
+ SandeshaThread invoker = manager.getInvoker();
+ if(invoker != null) invoker.wakeThread();
+ }
}
public void rollback() {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
Wed Feb 7 02:45:23 2007
@@ -389,8 +389,6 @@
// inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
-
SandeshaUtil.startSenderForTheSequence(ackRMMsgContext.getConfigurationContext(),
sequenceId);
-
}
public static void sendAckNow (RMMsgContext ackRMMsgContext) throws
AxisFault {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
Wed Feb 7 02:45:23 2007
@@ -54,8 +54,6 @@
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceID);
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,
sequenceID);
- String sequencePropertyKey =
SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
// operation is the lowest level Sandesha2 could be attached.
SandeshaPolicyBean propertyBean =
SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation());
@@ -85,7 +83,7 @@
// Only messages of outgoing sequences get
retransmitted. So named
// following method according to that.
-
finalizeTimedOutSequence(sequencePropertyKey,internalSequenceID, sequenceID,
rmMsgCtx.getMessageContext(), storageManager);
+ finalizeTimedOutSequence(internalSequenceID,
sequenceID, rmMsgCtx.getMessageContext(), storageManager);
continueSending = false;
}
}
@@ -140,13 +138,13 @@
return interval;
}
- private static void finalizeTimedOutSequence(String sequencePropertyKey
,String internalSequenceID, String sequenceID, MessageContext messageContext,
+ private static void finalizeTimedOutSequence(String internalSequenceID,
String sequenceID, MessageContext messageContext,
StorageManager storageManager) throws SandeshaException
{
ConfigurationContext configurationContext =
messageContext.getConfigurationContext();
// Already an active transaction, so don't want a new one
SequenceReport report =
SandeshaClient.getOutgoingSequenceReport(internalSequenceID,
configurationContext, false);
- TerminateManager.timeOutSendingSideSequence(sequencePropertyKey
,internalSequenceID, false, storageManager);
+ TerminateManager.timeOutSendingSideSequence(internalSequenceID,
false, storageManager);
SandeshaListener listener = (SandeshaListener) messageContext
.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Wed Feb 7 02:45:23 2007
@@ -77,6 +77,7 @@
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
import org.apache.sandesha2.workers.SandeshaThread;
import org.apache.sandesha2.wsrm.AckRequested;
@@ -142,46 +143,39 @@
return ackRanges;
}
- public static void startSenderForTheSequence(ConfigurationContext
context, String sequenceID) throws SandeshaException {
+ public static void startWorkersForSequence(ConfigurationContext
context, RMSequenceBean sequence)
+ throws SandeshaException {
if (log.isDebugEnabled())
- log.debug("Enter:
SandeshaUtil::startSenderForTheSequence , context " + context + ", sequenceID "
+ sequenceID);
+ log.debug("Enter:
SandeshaUtil::startWorkersForSequence, sequence " + sequence);
- SandeshaThread sender = getSandeshaStorageManager(context,
context.getAxisConfiguration()).getSender();
- sender.runThreadForSequence(context, sequenceID, true);
+ StorageManager mgr = getSandeshaStorageManager(context,
context.getAxisConfiguration());
+ boolean polling = sequence.isPollingMode();
- if (log.isDebugEnabled())
- log.debug("Exit:
SandeshaUtil::startSenderForTheSequence");
- }
-
- public static void startInvokerForTheSequence(ConfigurationContext
context, String sequenceID) throws SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter:
SandeshaUtil::startInvokerForTheSequence , context " + context + ", sequenceID
" + sequenceID);
+ SandeshaThread sender = mgr.getSender();
+ SandeshaThread invoker = mgr.getInvoker();
+ SandeshaThread pollMgr = mgr.getPollingManager();
- SandeshaThread invoker = getSandeshaStorageManager(context,
context.getAxisConfiguration()).getInvoker();
- invoker.runThreadForSequence(context, sequenceID, false);
-
- if (log.isDebugEnabled())
- log.debug("Exit:
SandeshaUtil::startInvokerForTheSequence");
- }
-
- public static void startPollingForTheSequence(ConfigurationContext
configurationContext, String sequenceID, boolean rmSource) throws
SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter:
SandeshaUtil::startPollingForTheSequence , context " + configurationContext +
", sequenceID " + sequenceID + ", rmSource");
-
// Only start the polling manager if we are configured to use
MakeConnection
- SandeshaPolicyBean policy =
getPropertyBean(configurationContext.getAxisConfiguration());
- if(!policy.isEnableMakeConnection()) {
+ if(polling && pollMgr == null) {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.makeConnectionDisabled);
throw new SandeshaException(message);
}
-
- SandeshaThread polling =
getSandeshaStorageManager(configurationContext,
configurationContext.getAxisConfiguration()).getPollingManager();
- polling.runThreadForSequence(configurationContext, sequenceID,
rmSource);
- if (log.isDebugEnabled())
- log.debug("Exit:
SandeshaUtil::startPollingForTheSequence");
+ if(sequence instanceof RMSBean) {
+ // We pass in the internal sequence id for internal
sequences.
+ String sequenceId =
((RMSBean)sequence).getInternalSequenceID();
+ sender.runThreadForSequence(context, sequenceId, true);
+ if(polling) pollMgr.runThreadForSequence(context,
sequenceId, true);
+ } else {
+ String sequenceId = sequence.getSequenceID();
+ sender.runThreadForSequence(context, sequenceId, false);
+ if(invoker != null)
invoker.runThreadForSequence(context, sequenceId, false);
+ if(polling) pollMgr.runThreadForSequence(context,
sequenceId, false);
+ }
+
+ if (log.isDebugEnabled()) log.debug("Exit:
SandeshaUtil::startWorkersForSequence");
}
-
+
public static String getMessageTypeString(int messageType) {
switch (messageType) {
case Sandesha2Constants.MessageTypes.CREATE_SEQ:
@@ -885,63 +879,6 @@
}
}
- /**This returns the Key used when store SequencePropertyBeans for the
passed message.
- * For the sending side this will be the internal sequence ID.
- * For the receiving side this is the sequenceId.
- *
- * @param rmMsgContext
- * @return
- */
-
- public static String getSequencePropertyKey (RMMsgContext rmMsgContext)
throws AxisFault {
- String propertyKey = (String)
rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY);
- if (propertyKey!=null)
- return propertyKey;
-
- String sequenceId = (String)
rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID);
- String internalSequenceId = (String)
rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
-
- int type = rmMsgContext.getMessageType();
- int flow = rmMsgContext.getMessageContext().getFLOW();
-
- if (flow==MessageContext.OUT_FLOW) {
- if (isSequenceResponseMessageType(type))
- propertyKey = sequenceId;
- else
- propertyKey = internalSequenceId;
- } else if (flow==MessageContext.IN_FLOW ||
-
flow==MessageContext.IN_FAULT_FLOW) {
- if (isSequenceResponseMessageType(type))
- propertyKey = internalSequenceId;
- else
- propertyKey = sequenceId;
- } else if (flow==MessageContext.OUT_FAULT_FLOW) {
- propertyKey = internalSequenceId;
- }
-
- //TODO handler cases not covered from above.
-
- if (propertyKey==null) {
- String typeStr =
SandeshaUtil.getMessageTypeString(rmMsgContext.getMessageType());
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotFindPropertyKey,typeStr);
- throw new SandeshaException (message);
- }
-
- return propertyKey;
- }
-
- private static boolean isSequenceResponseMessageType (int messageType) {
- if
(messageType==Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE ||
- messageType==Sandesha2Constants.MessageTypes.ACK ||
-
messageType==Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE_RESPONSE ||
-
messageType==Sandesha2Constants.MessageTypes.TERMINATE_SEQ_RESPONSE) {
-
- return true;
- } else {
- return false;
- }
- }
-
public static boolean isWSRMAnonymous(String address) {
if (address!=null &&
address.startsWith(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX))
return true;
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Wed Feb 7 02:45:23 2007
@@ -87,7 +87,6 @@
}
MessageContext createSeqContext =
createSequenceMsg.getMessageContext();
- ConfigurationContext configurationContext =
createSeqContext.getConfigurationContext();
rmdBean.setServerCompletedMessages(new RangeString());
@@ -121,10 +120,6 @@
storageManager.storeMessageContext(newKey,
createSeqContext);
}
- // message to invoke. This will apply for only in-order
invocations.
-
- SandeshaUtil.startSenderForTheSequence(configurationContext,
sequenceId);
-
String messageRMNamespace = createSequence.getNamespaceValue();
String specVersion = null;
@@ -151,13 +146,12 @@
}
- public static RMSBean setupNewClientSequence(MessageContext
firstAplicationMsgCtx, String sequencePropertyKey,
+ public static RMSBean setupNewClientSequence(MessageContext
firstAplicationMsgCtx,
String specVersion, StorageManager storageManager)
throws SandeshaException {
if (log.isDebugEnabled())
- log.debug("Enter:
SequenceManager::setupNewClientSequence " + sequencePropertyKey);
+ log.debug("Enter:
SequenceManager::setupNewClientSequence");
RMSBean rmsBean = new RMSBean();
- ConfigurationContext configurationContext =
firstAplicationMsgCtx.getConfigurationContext();
EndpointReference toEPR = firstAplicationMsgCtx.getTo();
String acksTo = (String)
firstAplicationMsgCtx.getProperty(SandeshaClientConstants.AcksTo);
@@ -255,8 +249,6 @@
// updating the last activated time.
rmsBean.setLastActivatedTime(System.currentTimeMillis());
- SandeshaUtil.startSenderForTheSequence(configurationContext,
sequencePropertyKey);
-
updateClientSideListnerIfNeeded(firstAplicationMsgCtx,
anonAcks);
if (log.isDebugEnabled())
log.debug("Exit:
SequenceManager::setupNewClientSequence " + rmsBean);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Wed Feb 7 02:45:23 2007
@@ -121,7 +121,7 @@
* @param sequenceID
* @throws SandeshaException
*/
- public static void
cleanReceivingSideOnTerminateMessage(ConfigurationContext configContext, String
sequencePropertyKey ,String sequenceId,
+ public static void
cleanReceivingSideOnTerminateMessage(ConfigurationContext configContext, String
sequenceId,
StorageManager storageManager) throws SandeshaException
{
// clean senderMap
@@ -148,14 +148,14 @@
// there is no invoking by Sandesha2. So clean
invocations storages.
receivingSideCleanMap.put(sequenceId,
CLEANED_ON_TERMINATE_MSG);
- cleanReceivingSideAfterInvocation(configContext,
sequencePropertyKey, sequenceId, storageManager);
+ cleanReceivingSideAfterInvocation(configContext,
sequenceId, storageManager);
} else {
String cleanStatus = (String)
receivingSideCleanMap.get(sequenceId);
if (cleanStatus != null
&&
CLEANED_AFTER_INVOCATION.equals(cleanStatus))
completeTerminationOfReceivingSide(configContext,
- sequencePropertyKey,
sequenceId, storageManager);
+ sequenceId, storageManager);
else
receivingSideCleanMap.put(sequenceId,
CLEANED_ON_TERMINATE_MSG);
}
@@ -170,7 +170,7 @@
* @param sequenceID
* @throws SandeshaException
*/
- public static void
cleanReceivingSideAfterInvocation(ConfigurationContext configContext, String
sequencePropertyKey ,String sequenceId,
+ public static void
cleanReceivingSideAfterInvocation(ConfigurationContext configContext, String
sequenceId,
StorageManager storageManager) throws SandeshaException
{
InvokerBeanMgr storageMapBeanMgr =
storageManager.getInvokerBeanMgr();
@@ -195,7 +195,7 @@
String cleanStatus = (String)
receivingSideCleanMap.get(sequenceId);
if (cleanStatus != null &&
CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
- completeTerminationOfReceivingSide(configContext,
sequencePropertyKey, sequenceId, storageManager);
+ completeTerminationOfReceivingSide(configContext,
sequenceId, storageManager);
else {
receivingSideCleanMap.put(sequenceId,
CLEANED_AFTER_INVOCATION);
}
@@ -206,7 +206,7 @@
* methods.
*
*/
- private static void
completeTerminationOfReceivingSide(ConfigurationContext configContext, String
sequencePropertyKey,String sequenceId,
+ private static void
completeTerminationOfReceivingSide(ConfigurationContext configContext,String
sequenceId,
StorageManager storageManager) throws SandeshaException
{
// TODO We need to remove the RMDBean, but doing so quickly can
stop
@@ -241,20 +241,20 @@
rmsBean.setTerminated(true);
storageManager.getRMSBeanMgr().update(rmsBean);
- cleanSendingSideData (rmsBean.getSequenceID(),
rmsBean.getInternalSequenceID(), serverSide, storageManager);
+ cleanSendingSideData (rmsBean.getInternalSequenceID(),
serverSide, storageManager);
}
- public static void timeOutSendingSideSequence(String
sequencePropertyKey,String internalSequenceId,
+ public static void timeOutSendingSideSequence(String internalSequenceId,
boolean serverside, StorageManager storageManager)
throws SandeshaException {
RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
rmsBean.setTimedOut(true);
storageManager.getRMSBeanMgr().update(rmsBean);
- cleanSendingSideData(sequencePropertyKey,internalSequenceId,
serverside, storageManager);
+ cleanSendingSideData(internalSequenceId, serverside,
storageManager);
}
- private static void cleanSendingSideData(String
sequencePropertyKey,String internalSequenceId,
+ private static void cleanSendingSideData(String internalSequenceId,
boolean serverSide, StorageManager storageManager)
throws SandeshaException {
SenderBeanMgr retransmitterBeanMgr =
storageManager.getSenderBeanMgr();
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
Wed Feb 7 02:45:23 2007
@@ -57,8 +57,6 @@
msgToInvoke =
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
RMMsgContext rmMsg =
MsgInitializer.initializeMessage(msgToInvoke);
- String sequencePropertyKey =
SandeshaUtil.getSequencePropertyKey(rmMsg);
-
// ending the transaction before invocation.
if(transaction != null) {
transaction.commit();
@@ -126,10 +124,7 @@
//this will work for RM 1.0 only
highestMessage = true;
} else {
-
- RMDBean findBean = new RMDBean ();
- findBean.setSequenceID(sequenceId);
- RMDBean rmdBean =
storageManager.getRMDBeanMgr().findUnique(findBean);
+ RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
if (rmdBean!=null &&
rmdBean.isTerminated()) {
long highestInMsgNo =
rmdBean.getHighestInMessageNumber();
@@ -140,7 +135,7 @@
if (highestMessage) {
//do cleaning stuff that hs to be done
after the invocation of the last message.
-
TerminateManager.cleanReceivingSideAfterInvocation(configurationContext,
sequencePropertyKey, sequenceId, storageManager);
+
TerminateManager.cleanReceivingSideAfterInvocation(configurationContext,
sequenceId, storageManager);
// exit from current iteration. (since
an entry
// was removed)
if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::run Last message return");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
Wed Feb 7 02:45:23 2007
@@ -142,6 +142,11 @@
}
+ /**
+ * Ensure that the worker thread is aware of the given sequence. As
source sequences
+ * do not have a proper sequence id at the time they are bootstrapped,
the caller
+ * must pass in the internal sequence id when rmSource is true.
+ */
public synchronized void runThreadForSequence(ConfigurationContext
context, String sequenceID, boolean rmSource){
if(log.isDebugEnabled()) log.debug("Entry:
SandeshaThread::runThreadForSequence, " + this);
@@ -175,7 +180,10 @@
* @return a List of SequenceEntry instances
*/
public synchronized ArrayList getSequences() {
- return workingSequences;
+ // Need to copy the list for thread safety
+ ArrayList result = new ArrayList();
+ result.addAll(workingSequences);
+ return result;
}
protected synchronized boolean hasStoppedRunning() {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Wed Feb 7 02:45:23 2007
@@ -17,6 +17,8 @@
package org.apache.sandesha2.workers;
+import java.util.ArrayList;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
@@ -24,6 +26,8 @@
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
/**
@@ -35,7 +39,12 @@
public class Sender extends SandeshaThread {
private static final Log log = LogFactory.getLog(Sender.class);
-
+
+ // If this sender is working for several sequences, we use round-robin
to
+ // try and give them all a chance to invoke messages.
+ int nextIndex = 0;
+ boolean processedMessage = false;
+
public Sender () {
super(Sandesha2Constants.SENDER_SLEEP_TIME);
}
@@ -44,20 +53,63 @@
if (log.isDebugEnabled()) log.debug("Enter:
Sender::internalRun");
Transaction transaction = null;
+ boolean sleep = false;
try {
+ // Pick a sequence using a round-robin approach
+ ArrayList allSequencesList = getSequences();
+ int size = allSequencesList.size();
+ log.debug("Choosing one from " + size + " sequences");
+ if(nextIndex >= size) {
+ nextIndex = 0;
+
+ // We just looped over the set of sequences. If
we didn't process any
+ // messages on this loop then we sleep before
the next one
+ if(size == 0 || !processedMessage) {
+ sleep = true;
+ }
+ processedMessage = false;
+
+ if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, looped over all sequences, sleep " + sleep);
+ return sleep;
+ }
+
+ SequenceEntry entry = (SequenceEntry)
allSequencesList.get(nextIndex++);
+ String sequenceId = entry.getSequenceId();
+ log.debug("Chose sequence " + sequenceId);
+
transaction = storageManager.getTransaction();
+ // Check that the sequence is still valid
+ boolean found = false;
+ if(entry.isRmSource()) {
+ RMSBean matcher = new RMSBean();
+ matcher.setInternalSequenceID(sequenceId);
+ matcher.setTerminated(false);
+ RMSBean rms =
storageManager.getRMSBeanMgr().findUnique(matcher);
+ if(rms != null) {
+ sequenceId = rms.getSequenceID();
+ found = true;
+ }
+ } else {
+ RMDBean matcher = new RMDBean();
+ matcher.setSequenceID(sequenceId);
+ matcher.setTerminated(false);
+ RMDBean rmd =
storageManager.getRMDBeanMgr().findUnique(matcher);
+ if(rmd != null) found = true;
+ }
+ if (!found) {
+ stopThreadForSequence(sequenceId,
entry.isRmSource());
+ if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, sequence has ended");
+ return false;
+ }
+
SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
- SenderBean senderBean = mgr.getNextMsgToSend();
+ SenderBean senderBean =
mgr.getNextMsgToSend(sequenceId);
if (senderBean == null) {
- // As there was no work to do, we sleep for a
while on the next loop.
- if (log.isDebugEnabled()) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
- log.debug("Exit: Sender::internalRun, "
+ message + ", sleeping");
- }
- return true;
+ if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, no message for this sequence");
+ return false; // Move on to the next sequence
in the list
}
// work Id is used to define the piece of work that
will be
@@ -96,6 +148,10 @@
// makes sure
// that all the workIds in the Lock are handled by
threads.
getWorkerLock().addWork(workId);
+
+ // If we got to here then we found work to do on the
sequence, so we should
+ // remember not to sleep at the end of the list of
sequences.
+ processedMessage = true;
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]