Author: mckierna
Date: Mon Nov 19 09:07:21 2007
New Revision: 596367
URL: http://svn.apache.org/viewvc?rev=596367&view=rev
Log: (empty)
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
Mon Nov 19 09:07:21 2007
@@ -313,7 +313,7 @@
ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction
(getRMVersion()));
ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction
(getRMVersion()));
- sendOutgoingMessage(ackRequestRMMsg,
Sandesha2Constants.MessageTypes.ACK_REQUEST, 0);
+ sendOutgoingMessage(ackRequestRMMsg,
Sandesha2Constants.MessageTypes.ACK_REQUEST, 0, null);
// Pause the message context
ackRequestRMMsg.pause();
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Mon Nov 19 09:07:21 2007
@@ -43,6 +43,7 @@
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
@@ -67,6 +68,7 @@
private String inboundSequence = null;
private long inboundMessageNumber;
+ private Transaction appMsgProcTran = null;
public ApplicationMsgProcessor() {
// Nothing to do
@@ -89,6 +91,7 @@
if (log.isDebugEnabled())
log.debug("Enter:
ApplicationMsgProcessor::processOutMessage");
+ appMsgProcTran= tran;
MessageContext msgContext = rmMsgCtx.getMessageContext();
ConfigurationContext configContext =
msgContext.getConfigurationContext();
@@ -244,6 +247,7 @@
String outSequenceID = null;
+
// Work out if there is a user transaction involved before
updating any store state
// to give any storage manager interface a chance to setup any
transactional state
boolean hasUserTransaction =
storageManager.hasUserTransaction(msgContext);
@@ -264,16 +268,18 @@
if (rmsBean == null) {
rmsBean =
SequenceManager.setupNewClientSequence(msgContext, internalSequenceId,
storageManager);
rmsBean =
addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
-
- if (rmsBean == null && tran !=
null && tran.isActive()) {
+ if(rmsBean != null)
outSequenceID = rmsBean.getSequenceID();
+
+ if (rmsBean == null &&
appMsgProcTran != null && appMsgProcTran.isActive()) {
// Rollback the current
locks.
- tran.rollback();
+
appMsgProcTran.rollback();
// Create a new tran.
This avoids a potential deadlock where the RMS/RMDBeans
// are taken in reverse
order.
- tran =
storageManager.getTransaction();
+ appMsgProcTran =
storageManager.getTransaction();
}
}
+
}
} else {
@@ -435,15 +441,17 @@
//reliable message. If he doesn't have a endpoint he
should use polling mechanisms.
msgContext.pause();
- if (tran != null && tran.isActive()) {
- tran.commit();
- tran = null;
+ if (appMsgProcTran != null &&
appMsgProcTran.isActive()) {
+ appMsgProcTran.commit();
+ appMsgProcTran = null;
}
}
+
finally {
- if (tran != null && tran.isActive())
- tran.rollback();
+ if (appMsgProcTran != null && appMsgProcTran.isActive())
+ appMsgProcTran.rollback();
}
+
if (log.isDebugEnabled())
log.debug("Exit:
ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
return true;
@@ -458,7 +466,7 @@
MessageContext applicationMsg =
applicationRMMsg.getMessageContext();
ConfigurationContext configCtx =
applicationMsg.getConfigurationContext();
- // generating a new create sequeuce message.
+ // generating a new create sequence message.
RMMsgContext createSeqRMMessage =
RMMsgCreator.createCreateSeqMsg(rmsBean, applicationRMMsg);
createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
@@ -474,7 +482,7 @@
MessageContext createSeqMsg =
createSeqRMMessage.getMessageContext();
createSeqMsg.setRelationships(null); // create seq msg does not
-
// relateTo anything
+
// relateTo anything
String createSequenceMessageStoreKey = SandeshaUtil.getUUID();
// the key that will be used to store
//the create
sequence message.
@@ -517,20 +525,76 @@
SandeshaUtil.executeAndStore(createSeqRMMessage,
createSequenceMessageStoreKey, storageManager);
storageManager.getSenderBeanMgr().insert(createSeqEntry);
-
+
+ if(appMsgProcTran != null &&
createSeqRMMessage.getMessageId() != null &&
!storageManager.hasUserTransaction(createSeqMsg)) {
+
+ // Lock the sender bean before we insert it, if
we are planning to send it ourselves
+ String workId = createSeqEntry.getMessageID() +
createSeqEntry.getTimeToSend();
+ SandeshaThread sender =
storageManager.getSender();
+
+ ConfigurationContext context =
createSeqMsg.getConfigurationContext();
+ WorkerLock lock = sender.getWorkerLock();
+
+ SenderWorker worker = new SenderWorker(context,
createSeqEntry, rmsBean.getRMVersion());
+ worker.setLock(lock);
+ worker.setWorkId(workId);
+ // Actually take the lock
+ lock.addWork(workId, worker);
+
+ // Commit the transaction, so that the sender
worker starts with a clean slate.
+ if(appMsgProcTran.isActive())
appMsgProcTran.commit();
+
+ if(worker != null) {
+ try {
+ worker.run();
+ } catch(Exception e) {
+ log.error("Caught exception running
SandeshaWorker", e);
+ }
+ }
+
+ //Create transaction
+ appMsgProcTran =
storageManager.getTransaction();
+
+ //Find RMSBean
+ RMSBeanMgr rmsBeanMgr =
storageManager.getRMSBeanMgr();
+ RMSBean tempRMSBean = new RMSBean();
+
tempRMSBean.setInternalSequenceID(rmsBean.getInternalSequenceID());
+ rmsBean = rmsBeanMgr.findUnique(tempRMSBean);
+
+ // If the RMSBean has been terminated this
means that we may
+ // well have encountered a problem sending this
message
+ if (rmsBean == null || rmsBean.isTerminated()){
+
+ if (log.isDebugEnabled())
+ log.debug("Exit:
ApplicationMsgProcessor::addCreateSequenceMessage, Failed to establish sequence
" + rmsBean);
+
+ if (rmsBean != null &&
rmsBean.getLastSendError() != null) {
+ if (rmsBean.getLastSendError()
instanceof AxisFault)
+ throw
(AxisFault)rmsBean.getLastSendError();
+ }
+ if (rmsBean.getLastSendError() != null)
+ throw new
AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused),
+
rmsBean.getLastSendError());
+
+ throw new
AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused));
+
+ }
+ }
// Setup enough of the workers to get this create
sequence off the box.
SandeshaUtil.startWorkersForSequence(configCtx,
rmsBean);
} else {
rmsBean = null;
}
-
+
if (log.isDebugEnabled())
log.debug("Exit:
ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
+
return rmsBean;
}
private void processResponseMessage(RMMsgContext rmMsg, RMSBean
rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
String storageKey, StorageManager storageManager,
Transaction tran, boolean hasUserTransaction) throws AxisFault {
+
if (log.isDebugEnabled())
log.debug("Enter:
ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", "
+ outSequenceID);
@@ -623,7 +687,7 @@
}
// Commit the transaction, so that the sender worker starts
with a clean slate.
- if(tran != null && tran.isActive()) tran.commit();
+ if(appMsgProcTran != null && appMsgProcTran.isActive())
appMsgProcTran.commit();
if(worker != null) {
try {
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
Mon Nov 19 09:07:21 2007
@@ -179,7 +179,7 @@
rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction
(getRMVersion()));
// Send this outgoing message
- sendOutgoingMessage(rmMsgCtx,
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0);
+ sendOutgoingMessage(rmMsgCtx,
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0, transaction);
// Pause the message context
rmMsgCtx.pause();
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Mon Nov 19 09:07:21 2007
@@ -44,6 +44,7 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -56,6 +57,9 @@
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.TerminateSequence;
@@ -399,8 +403,8 @@
// Set a retransmitter lastSentTime so that terminate will be
send with
// some delay.
// Otherwise this get send before return of the current request
(ack).
- // TODO: refine the terminate delay.
- sendOutgoingMessage(rmMsgCtx,
Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
Sandesha2Constants.TERMINATE_DELAY);
+ // TODO: refine the terminate delay.
+ sendOutgoingMessage(rmMsgCtx,
Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
Sandesha2Constants.TERMINATE_DELAY, transaction);
// Pause the message context
rmMsgCtx.pause();
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
Mon Nov 19 09:07:21 2007
@@ -771,17 +771,14 @@
return;
}
- /* if (rmsBean.getLastSendError() == null) {
- // Indicate that there was an error when sending the
Create Sequence.
- rmsBean.setLastSendError(fault);
+ // Indicate that there was an error when sending the Create
Sequence.
+ rmsBean.setLastSendError(fault);
+ // Mark the sequence as terminated
+ rmsBean.setTerminated(true);
- // Update the RMSBean
- rmsBeanMgr.update(rmsBean);
- if (log.isDebugEnabled())
- log.debug("Exit:
FaultManager::processCreateSequenceRefusedFault Allowing another CreateSequence
attempt");
- return;
- }
-*/
+ // Update the RMSBean
+ rmsBeanMgr.update(rmsBean);
+
SenderBean createSequenceSenderBean =
retransmitterMgr.retrieve(createSeqMsgId);
if (createSequenceSenderBean == null)
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
Mon Nov 19 09:07:21 2007
@@ -34,9 +34,13 @@
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
public class WSRMMessageSender {
@@ -117,7 +121,7 @@
}
- protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType,
long delay) throws AxisFault {
+ protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType,
long delay, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
@@ -154,6 +158,7 @@
{
senderBean.setSend(true);
senderBean.setSequenceID(outSequenceID);
+
}
else
senderBean.setSend(false);
@@ -173,10 +178,36 @@
SenderBeanMgr retramsmitterMgr =
storageManager.getSenderBeanMgr();
- SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
+ SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
retramsmitterMgr.insert(senderBean);
-
+
+ if (sequenceExists &&
!storageManager.hasUserTransaction(msgContext)) {
+
+ String workId = msgContext.getMessageID()
+ + senderBean.getTimeToSend();
+ SandeshaThread sender = storageManager.getSender();
+ WorkerLock lock = sender.getWorkerLock();
+
+ SenderWorker worker = new
SenderWorker(configurationContext,
+ senderBean, rmsBean.getRMVersion());
+ worker.setLock(lock);
+ worker.setWorkId(workId);
+ // Actually take the lock
+ lock.addWork(workId, worker);
+
+ // Commit the transaction, so that the sender worker
starts with a clean state
+ if (transaction != null && transaction.isActive())
+ transaction.commit();
+
+ if (worker != null) {
+ try {
+ worker.run();
+ } catch (Exception e) {
+ log.error("Caught exception running
SandeshaWorker", e);
+ }
+ }
+ }
if (log.isDebugEnabled())
log.debug("Exit:
WSRMParentProcessor::sendOutgoingMessage");
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
Mon Nov 19 09:07:21 2007
@@ -47,7 +47,7 @@
private boolean stopRequested = false;
private int sleepTime;
- private WorkerLock lock = null;
+ private WorkerLock lock = null;
private ArrayList workingSequences = new ArrayList();
@@ -56,9 +56,11 @@
protected StorageManager storageManager = null;
private boolean reRunThread;
+
public SandeshaThread(int sleepTime) {
this.sleepTime = sleepTime;
- lock = new WorkerLock ();
+ this.setDaemon(true);
+ lock = new WorkerLock ();
}
public final WorkerLock getWorkerLock() {
@@ -91,7 +93,7 @@
//ignore
}
}
-
+
//we can now request a pause - the next pause will be our
pauseRequired = true;
Modified:
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java
(original)
+++
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java
Mon Nov 19 09:07:21 2007
@@ -19,6 +19,7 @@
import java.io.File;
import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
@@ -99,7 +100,12 @@
serviceClient.setOptions(clientOptions);
TestCallback callback1 = new TestCallback ("Callback 1");
- serviceClient.sendReceiveNonBlocking
(getEchoOMBlock("echo1",sequenceKey),callback1);
+ boolean caughtException = false;
+ try {
+ serviceClient.sendReceiveNonBlocking
(getEchoOMBlock("echo1",sequenceKey),callback1);
+ } catch (AxisFault e) {
+ caughtException = true;
+ }
long limit = System.currentTimeMillis() + waitTime;
Error lastError = null;
@@ -112,7 +118,9 @@
assertEquals(sequenceReport.getSequenceStatus(),SequenceReport.SEQUENCE_STATUS_TERMINATED);
assertEquals(sequenceReport.getSequenceDirection(),SequenceReport.SEQUENCE_DIRECTION_OUT);
- assertTrue(callback1.isErrorReported());
+ if (!caughtException)
+ assertTrue(callback1.isErrorReported());
+
assertEquals(callback1.getResult(),null);
lastError = null;
@@ -130,4 +138,6 @@
}
}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]