Author: chamikara
Date: Sun Aug 20 22:07:59 2006
New Revision: 433150
URL: http://svn.apache.org/viewvc?rev=433150&view=rev
Log:
Made sender and Invoker thread pools. Still the pool size is 1.
(hope to bring this into a policy after some testing).
This will allow sandesha2 to send and invoke multiple message concurrently.
This also eliminates some deadlock scenarios which existed before.
Sender, and Invoker classes manage thread pools.
SenderWorker and InvokerWorker does the sending and invoking of a single
message respectively.
(old InOrderInvoker class was removed).
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Removed:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java?rev=433150&r1=433149&r2=433150&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java
Sun Aug 20 22:07:59 2006
@@ -27,4 +27,6 @@
public void rollback ();
+ //indicates that the transaction has been started, but has not been
committed or rolledbacked yet.
+ public boolean isActive ();
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=433150&r1=433149&r2=433150&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
Sun Aug 20 22:07:59 2006
@@ -33,5 +33,9 @@
public void rollback() {
}
+
+ public boolean isActive () {
+ return false;
+ }
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=433150&r1=433149&r2=433150&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Sun Aug 20 22:07:59 2006
@@ -65,7 +65,7 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.workers.InOrderInvoker;
+import org.apache.sandesha2.workers.Invoker;
import org.apache.sandesha2.workers.Sender;
import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
@@ -230,24 +230,24 @@
public static void startInvokerForTheSequence(ConfigurationContext
context, String sequenceID) {
- InOrderInvoker invoker = (InOrderInvoker)
context.getProperty(Sandesha2Constants.INVOKER);
+ Invoker invoker = (Invoker)
context.getProperty(Sandesha2Constants.INVOKER);
if (invoker!=null)
invoker.runInvokerForTheSequence(context,sequenceID);
else {
- invoker = new InOrderInvoker ();
+ invoker = new Invoker ();
context.setProperty(Sandesha2Constants.INVOKER,invoker);
invoker.runInvokerForTheSequence(context,sequenceID);
}
}
private static void stopInvokerForTheSequence(String sequenceID,
ConfigurationContext context) {
- InOrderInvoker invoker = (InOrderInvoker)
context.getProperty(Sandesha2Constants.INVOKER);
+ Invoker invoker = (Invoker)
context.getProperty(Sandesha2Constants.INVOKER);
if (invoker!=null)
invoker.stopInvokerForTheSequence(sequenceID);
}
public static void stopInvoker(ConfigurationContext context) {
- InOrderInvoker invoker = (InOrderInvoker)
context.getProperty(Sandesha2Constants.INVOKER);
+ Invoker invoker = (Invoker)
context.getProperty(Sandesha2Constants.INVOKER);
if (invoker!=null)
invoker.stopInvoking();
}
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?rev=433150&view=auto
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
(added)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
Sun Aug 20 22:07:59 2006
@@ -0,0 +1,266 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.axis2.util.threadpool.ThreadPool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * This is used when InOrder invocation is required. This is a seperated Thread
+ * that keep running all the time. At each iteration it checks the InvokerTable
+ * to find weather there are any messages to me invoked.
+ */
+
+public class Invoker extends Thread {
+
+ private boolean runInvoker = false;
+ private ArrayList workingSequences = new ArrayList();
+ private ConfigurationContext context = null;
+ private static final Log log = LogFactory.getLog(Invoker.class);
+ private boolean hasStopped = false;
+
+ private transient ThreadFactory threadPool;
+ public int INVOKER_THREADPOOL_SIZE =5;
+
+ public Invoker () {
+ threadPool = new ThreadPool
(INVOKER_THREADPOOL_SIZE,INVOKER_THREADPOOL_SIZE);
+ }
+
+ public synchronized void stopInvokerForTheSequence(String sequenceID) {
+ if (log.isDebugEnabled())
+ log.debug("Enter:
InOrderInvoker::stopInvokerForTheSequence, " + sequenceID);
+
+ workingSequences.remove(sequenceID);
+ if (workingSequences.size() == 0) {
+ runInvoker = false;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit:
InOrderInvoker::stopInvokerForTheSequence");
+ }
+
+ public synchronized void stopInvoking() {
+ if (log.isDebugEnabled())
+ log.debug("Enter: InOrderInvoker::stopInvoking");
+
+ if (isInvokerStarted()) {
+ // the invoker is started so stop it
+ runInvoker = false;
+ // wait for it to finish
+ while (!hasStoppedInvoking()) {
+ try {
+
wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
+ } catch (InterruptedException e1) {
+ log.debug(e1.getMessage());
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: InOrderInvoker::stopInvoking");
+ }
+
+ public synchronized boolean isInvokerStarted() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: InOrderInvoker::isInvokerStarted");
+ log.debug("Exit: InOrderInvoker::isInvokerStarted, " +
runInvoker);
+ }
+ return runInvoker;
+ }
+
+ public synchronized void runInvokerForTheSequence(ConfigurationContext
context, String sequenceID) {
+ if (log.isDebugEnabled())
+ log.debug("Enter:
InOrderInvoker::runInvokerForTheSequence");
+
+ if (!workingSequences.contains(sequenceID))
+ workingSequences.add(sequenceID);
+
+ if (!isInvokerStarted()) {
+ this.context = context;
+ runInvoker = true; // so that isSenderStarted()=true.
+ super.start();
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit:
InOrderInvoker::runInvokerForTheSequence");
+ }
+
+ private synchronized boolean hasStoppedInvoking() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
+ log.debug("Exit: InOrderInvoker::hasStoppedInvoking, "
+ hasStopped);
+ }
+ return hasStopped;
+ }
+
+ public void run() {
+ if (log.isDebugEnabled())
+ log.debug("Enter: InOrderInvoker::run");
+
+ try {
+ internalRun();
+ } finally {
+ // flag that we have exited the run loop and notify any
waiting
+ // threads
+ synchronized (this) {
+ hasStopped = true;
+ notify();
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: InOrderInvoker::run");
+ }
+
+ private void internalRun() {
+ if (log.isDebugEnabled())
+ log.debug("Enter: InOrderInvoker::internalRun");
+
+ while (isInvokerStarted()) {
+
+ try {
+
Thread.sleep(Sandesha2Constants.INVOKER_SLEEP_TIME);
+ } catch (InterruptedException ex) {
+ log.debug("Invoker was Inturrepted....");
+ log.debug(ex.getMessage());
+ }
+
+ Transaction transaction = null;
+ boolean rolebacked = false;
+
+ try {
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(context, context
+ .getAxisConfiguration());
+ NextMsgBeanMgr nextMsgMgr =
storageManager.getNextMsgBeanMgr();
+
+ InvokerBeanMgr storageMapMgr =
storageManager.getStorageMapBeanMgr();
+
+ SequencePropertyBeanMgr sequencePropMgr =
storageManager.getSequencePropertyBeanMgr();
+
+ transaction = storageManager.getTransaction();
+
+ // Getting the incomingSequenceIdList
+ SequencePropertyBean allSequencesBean =
sequencePropMgr.retrieve(
+
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (allSequencesBean == null) {
+ if (log.isDebugEnabled())
+ log.debug("AllSequencesBean not
found");
+ continue;
+ }
+ ArrayList allSequencesList =
SandeshaUtil.getArrayListFromString(allSequencesBean.getValue());
+
+ Iterator allSequencesItr =
allSequencesList.iterator();
+
+// currentIteration: while
(allSequencesItr.hasNext()) {
+ String sequenceId = (String)
allSequencesItr.next();
+
+ NextMsgBean nextMsgBean =
nextMsgMgr.retrieve(sequenceId);
+ if (nextMsgBean == null) {
+ String message = "Next message
not set correctly. Removing invalid entry.";
+ log.debug(message);
+ allSequencesItr.remove();
+
+ // cleaning the invalid data of
the all sequences.
+
allSequencesBean.setValue(allSequencesList.toString());
+
sequencePropMgr.update(allSequencesBean);
+ continue;
+ }
+
+ long nextMsgno =
nextMsgBean.getNextMsgNoToProcess();
+ if (nextMsgno <= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Invalid Next
Message Number " + nextMsgno);
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
+
.toString(nextMsgno));
+ throw new
SandeshaException(message);
+ }
+
+ Iterator stMapIt =
storageMapMgr.find(new InvokerBean(null, nextMsgno, sequenceId)).iterator();
+
+ if (stMapIt.hasNext()) {
+
+ InvokerBean invokerBean =
(InvokerBean) stMapIt.next();
+
+ transaction.commit();
+
+ //start a new worker thread and
let it do the invocation.
+ InvokerWorker worker = new
InvokerWorker (context,invokerBean);
+ threadPool.execute(worker);
+
+ }
+
+
+// }
+
+ } catch (Exception e) {
+ if (transaction != null) {
+ try {
+ transaction.rollback();
+ rolebacked = true;
+ } catch (Exception e1) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
+ .toString());
+ log.debug(message, e1);
+ }
+ }
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invokeMsgError);
+ log.debug(message, e);
+ } finally {
+ if (!rolebacked && transaction != null) {
+ try {
+ transaction.commit();
+ } catch (Exception e) {
+ String message =
SandeshaMessageHelper
+
.getMessage(SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
+ }
+ }
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: InOrderInvoker::internalRun");
+ }
+
+}
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?rev=433150&view=auto
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
(added)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
Sun Aug 20 22:07:59 2006
@@ -0,0 +1,180 @@
+package org.apache.sandesha2.workers;
+
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+public class InvokerWorker implements Runnable {
+
+ ConfigurationContext configurationContext = null;
+ InvokerBean invokerBean = null;
+ Log log = LogFactory.getLog(InvokerWorker.class);
+
+ public InvokerWorker (ConfigurationContext configurationContext,
InvokerBean invokerBean) {
+ this.configurationContext = configurationContext;
+ this.invokerBean = invokerBean;
+ }
+
+ public void run() {
+
+ Transaction transaction = null;
+ MessageContext msgToInvoke = null;
+
+ try {
+
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ InvokerBeanMgr invokerBeanMgr =
storageManager.getStorageMapBeanMgr();
+ NextMsgBeanMgr nextMsgMgr =
storageManager.getNextMsgBeanMgr();
+
+ //starting a transaction
+ transaction = storageManager.getTransaction();
+
+ String key = invokerBean.getMessageContextRefKey();
+
+ msgToInvoke =
storageManager.retrieveMessageContext(key, configurationContext);
+ RMMsgContext rmMsg =
MsgInitializer.initializeMessage(msgToInvoke);
+
+ //endint the transaction before invocation.
+ transaction.commit();
+
+ boolean invoked = false;
+
+ long messageNo = invokerBean.getMsgNo();
+
+ try {
+
+ // Invocation is not done within a transation.
This
+ // may get changed when WS-AT is available.
+
+ // Invoking the message.
+
msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
+ Sandesha2Constants.VALUE_TRUE);
+
+ boolean postFailureInvocation = false;
+
+ // StorageManagers should st following property
to
+ // true, to indicate that the message received
comes
+ // after a failure.
+ String postFaulureProperty = (String)
msgToInvoke
+
.getProperty(Sandesha2Constants.POST_FAILURE_MESSAGE);
+ if (postFaulureProperty != null
+ &&
Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
+ postFailureInvocation = true;
+
+ AxisEngine engine = new
AxisEngine(configurationContext);
+ if (postFailureInvocation) {
+
makeMessageReadyForReinjection(msgToInvoke);
+ if (log.isDebugEnabled())
+ log.debug("Receiving message,
key=" + key + ", msgCtx="
+ +
msgToInvoke.getEnvelope().getHeader());
+ engine.receive(msgToInvoke);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Resuming message,
key=" + key + ", msgCtx="
+ +
msgToInvoke.getEnvelope().getHeader());
+ msgToInvoke.setPaused(false);
+ engine.resumeReceive(msgToInvoke);
+ }
+
+ invoked = true;
+
+ } catch (Exception e) {
+ if (log.isDebugEnabled())
+ log.debug("Exception :", e);
+
+ handleFault(msgToInvoke, e);
+
+ // throw new SandeshaException(e);
+ }
+
+ //starting a transaction for the post-invocation work.
+ transaction = storageManager.getTransaction();
+
+ // Service will be invoked only once. I.e. even if an
+ // exception get thrown in invocation
+ // the service will not be invoked again.
+ invokerBeanMgr.delete(key);
+
+ // removing the corresponding message context as well.
+ MessageContext msgCtx =
storageManager.retrieveMessageContext(key, configurationContext);
+ if (msgCtx != null) {
+ storageManager.removeMessageContext(key);
+ }
+
+ // updating the next msg to invoke
+
+ String sequenceId = invokerBean.getSequenceID();
+ NextMsgBean nextMsgBean =
nextMsgMgr.retrieve(sequenceId);
+
+
+ if (rmMsg.getMessageType() ==
Sandesha2Constants.MessageTypes.APPLICATION) {
+ Sequence sequence = (Sequence) rmMsg
+
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (sequence.getLastMessage() != null) {
+
TerminateManager.cleanReceivingSideAfterInvocation(configurationContext,
sequenceId, storageManager);
+ // exit from current iteration. (since
an entry
+ // was removed)
+
+ return;
+ }
+ }
+
+ long nextMsgNo = nextMsgBean.getNextMsgNoToProcess();
+
+ if (!(messageNo==nextMsgNo)) {
+ String message = "Operated message number is
different from the Next Message Number to invoke";
+ throw new SandeshaException (message);
+ }
+
+ if (invoked) {
+ nextMsgNo++;
+ nextMsgBean.setNextMsgNoToProcess(nextMsgNo);
+ nextMsgMgr.update(nextMsgBean);
+ }
+ } catch (SandeshaStorageException e) {
+ transaction.rollback();
+ } catch (SandeshaException e) {
+ e.printStackTrace(); //TODO remove
+ log.error(e);
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error(e);
+ } finally {
+ if (transaction!=null && transaction.isActive())
+ transaction.commit();
+ }
+ }
+
+ private void makeMessageReadyForReinjection(MessageContext
messageContext) {
+
messageContext.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, null);
+ messageContext.getOptions().setMessageId(null);
+ messageContext.getOptions().setTo(null);
+ messageContext.getOptions().setAction(null);
+
messageContext.setProperty(Sandesha2Constants.REINJECTED_MESSAGE,
Sandesha2Constants.VALUE_TRUE);
+ }
+
+ private void handleFault(MessageContext inMsgContext, Exception e)
throws Exception {
+ // msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
+ AxisEngine engine = new
AxisEngine(inMsgContext.getConfigurationContext());
+ MessageContext faultContext =
engine.createFaultMessageContext(inMsgContext, e);
+ engine.sendFault(faultContext);
+ }
+
+}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?rev=433150&r1=433149&r2=433150&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Sun Aug 20 22:07:59 2006
@@ -19,20 +19,11 @@
import java.util.ArrayList;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFault;
-import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.transport.TransportSender;
-import org.apache.axis2.transport.TransportUtils;
-import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.axis2.util.threadpool.ThreadPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
@@ -41,13 +32,7 @@
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
-import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
/**
* This is responsible for sending and re-sending messages of Sandesha2. This
@@ -58,14 +43,17 @@
public class Sender extends Thread {
private boolean runSender = false;
-
private ArrayList workingSequences = new ArrayList();
-
private ConfigurationContext context = null;
-
private static final Log log = LogFactory.getLog(Sender.class);
-
private boolean hasStopped = false;
+
+ private transient ThreadFactory threadPool;
+ public int SENDER_THREADPOOL_SIZE = 5;
+
+ public Sender () {
+ threadPool = new ThreadPool
(SENDER_THREADPOOL_SIZE,SENDER_THREADPOOL_SIZE);
+ }
public synchronized void stopSenderForTheSequence(String sequenceID) {
if (log.isDebugEnabled())
@@ -180,142 +168,25 @@
log.debug("SenderBean not
found");
continue;
}
-
- String key =
senderBean.getMessageContextRefKey();
- MessageContext msgCtx =
storageManager.retrieveMessageContext(key, context);
-
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_TRUE);
-
- MessageRetransmissionAdjuster
retransmitterAdjuster = new MessageRetransmissionAdjuster();
- boolean continueSending =
retransmitterAdjuster.adjustRetransmittion(senderBean, context,
- storageManager);
- if (!continueSending) {
- continue;
- }
-
- if (msgCtx == null) {
- String message = "Message context is
not present in the storage";
- }
-
- // sender will not send the message if
following property is
- // set and not true.
- // But it will set if it is not set (null)
-
- // This is used to make sure that the mesage
get passed the
- // Sandesha2TransportSender.
-
- String qualifiedForSending = (String)
msgCtx.getProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING);
- if (qualifiedForSending != null &&
!qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
- continue;
- }
-
- if (msgCtx == null) {
-
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendHasUnavailableMsgEntry));
- break;
- }
-
- RMMsgContext rmMsgCtx =
MsgInitializer.initializeMessage(msgCtx);
-
- // operation is the lowest level Sandesha2
should be attached
- ArrayList msgsNotToSend =
SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation()).getMsgTypesToDrop();
-
- if (msgsNotToSend != null &&
msgsNotToSend.contains(new Integer(rmMsgCtx.getMessageType()))) {
- continue;
- }
-
- updateMessage(msgCtx);
-
- int messageType = senderBean.getMessageType();
- if (messageType ==
Sandesha2Constants.MessageTypes.APPLICATION) {
- Sequence sequence = (Sequence)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String sequenceID =
sequence.getIdentifier().getIdentifier();
- }
-
- // checking weather this message can carry
piggybacked acks
- if (isAckPiggybackableMsgType(messageType)) {
- // piggybacking if an ack if available
for the same
- // sequence.
- // TODO do piggybacking based on wsa:To
-
AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx, storageManager);
- }
-
- // sending the message
- TransportOutDescription transportOutDescription
= msgCtx.getTransportOut();
- TransportSender transportSender =
transportOutDescription.getSender();
-
- boolean successfullySent = false;
- if (transportSender != null) {
-
- // have to commit the transaction
before sending. This may
- // get changed when WS-AT is available.
- transaction.commit();
-
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_FALSE);
- try {
-
- // had to fully build the SOAP
envelope to support
- // retransmissions.
- // Otherwise a
'parserAlreadyAccessed' exception could
- // get thrown in
retransmissions.
- // But this has a performance
reduction.
- msgCtx.getEnvelope().build();
-
- if (log.isDebugEnabled())
- log.debug("Invoking
using transportSender " + transportSender + ", msgCtx="
- +
msgCtx.getEnvelope().getHeader());
- // TODO change this to cater
for security.
- transportSender.invoke(msgCtx);
- successfullySent = true;
- } catch (Exception e) {
- // TODO Auto-generated catch
block
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e
- .toString());
- log.debug(message, e);
- } finally {
- transaction =
storageManager.getTransaction();
-
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_TRUE);
- }
- }
-
- // update or delete only if the object is still
present.
- SenderBean bean1 =
mgr.retrieve(senderBean.getMessageID());
- if (bean1 != null) {
- if (senderBean.isReSend()) {
-
bean1.setSentCount(senderBean.getSentCount());
-
bean1.setTimeToSend(senderBean.getTimeToSend());
- mgr.update(bean1);
- } else {
-
mgr.delete(bean1.getMessageID());
-
- // removing the message from
the storage.
- String messageStoredKey =
bean1.getMessageContextRefKey();
-
storageManager.removeMessageContext(messageStoredKey);
- }
- }
-
- if (successfullySent) {
- if (!msgCtx.isServerSide())
- checkForSyncResponses(msgCtx);
- }
-
- if (rmMsgCtx.getMessageType() ==
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
- // terminate sending side.
- TerminateSequence terminateSequence =
(TerminateSequence) rmMsgCtx
-
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- String sequenceID =
terminateSequence.getIdentifier().getIdentifier();
- ConfigurationContext configContext =
msgCtx.getConfigurationContext();
-
- String internalSequenceID =
SandeshaUtil.getSequenceProperty(sequenceID,
-
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
-
TerminateManager.terminateSendingSide(configContext, internalSequenceID,
msgCtx.isServerSide(),
- storageManager);
- }
-
-
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_FALSE);
+
+ transaction.commit();
+
+
+
+ //start a worker which will work on this
message.s
+ SenderWorker worker = new SenderWorker
(context,senderBean);
+ threadPool.execute(worker);
+
} catch (Exception e) {
// TODO : when this is the client side throw
the exception to
// the client when necessary.
+
+ //TODO rollback only if a
SandeshaStorageException.
+ //This allows the other Exceptions to be used
within the Normal flow.
+
if (transaction != null) {
try {
transaction.rollback();
@@ -360,117 +231,6 @@
}
if (log.isDebugEnabled())
log.debug("Exit: Sender::runSenderForTheSequence");
- }
-
- private void updateMessage(MessageContext msgCtx1) throws
SandeshaException {
- // do updates if required.
- }
-
- private void checkForSyncResponses(MessageContext msgCtx) throws
SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter: Sender::checkForSyncResponses, " +
msgCtx.getEnvelope().getHeader());
-
- try {
-
- boolean responsePresent =
(msgCtx.getProperty(MessageContext.TRANSPORT_IN) != null);
- if (!responsePresent)
- return;
-
- // create the responseMessageContext
-
- MessageContext responseMessageContext = new
MessageContext();
- responseMessageContext.setServerSide(false);
-
responseMessageContext.setConfigurationContext(msgCtx.getConfigurationContext());
-
responseMessageContext.setTransportIn(msgCtx.getTransportIn());
-
responseMessageContext.setTransportOut(msgCtx.getTransportOut());
-
-
responseMessageContext.setProperty(MessageContext.TRANSPORT_IN, msgCtx
-
.getProperty(MessageContext.TRANSPORT_IN));
-
responseMessageContext.setServiceContext(msgCtx.getServiceContext());
-
responseMessageContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
-
- // copying required properties from op. context to the
response msg
- // ctx.
- OperationContext requestMsgOpCtx =
msgCtx.getOperationContext();
- if (requestMsgOpCtx != null) {
- if
(responseMessageContext.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE)
== null) {
-
responseMessageContext.setProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE,
requestMsgOpCtx
-
.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE));
- }
-
- if
(responseMessageContext.getProperty(HTTPConstants.CHAR_SET_ENCODING) == null) {
-
responseMessageContext.setProperty(HTTPConstants.CHAR_SET_ENCODING,
requestMsgOpCtx
-
.getProperty(HTTPConstants.CHAR_SET_ENCODING));
- }
- }
-
- // If request is REST we assume the
responseMessageContext is REST,
- // so set the variable
-
-
responseMessageContext.setDoingREST(msgCtx.isDoingREST());
-
- SOAPEnvelope resenvelope = null;
- try {
- resenvelope =
TransportUtils.createSOAPMessage(msgCtx,
msgCtx.getEnvelope().getNamespace().getName());
-
- } catch (AxisFault e) {
- // TODO Auto-generated catch block
-
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapEnvNotSet));
- log.debug(e.getStackTrace().toString());
- }
-
- // if the request msg ctx is withina a transaction,
processing if
- // the response should also happen
- // withing the same transaction
-
responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, msgCtx
-
.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
-
- if (resenvelope != null) {
- responseMessageContext.setEnvelope(resenvelope);
- AxisEngine engine = new
AxisEngine(msgCtx.getConfigurationContext());
-
- if (isFaultEnvelope(resenvelope)) {
-
engine.receiveFault(responseMessageContext);
- } else {
- engine.receive(responseMessageContext);
- }
- }
-
- } catch (Exception e) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
- log.debug(message, e);
- throw new SandeshaException(message, e);
- }
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::checkForSyncResponses");
- }
-
- private boolean isAckPiggybackableMsgType(int messageType) {
- if (log.isDebugEnabled())
- log.debug("Enter: Sender::isAckPiggybackableMsgType, "
+ messageType);
- boolean piggybackable = true;
-
- if (messageType == Sandesha2Constants.MessageTypes.ACK)
- piggybackable = false;
-
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::isAckPiggybackableMsgType, " +
piggybackable);
- return piggybackable;
- }
-
- private boolean isFaultEnvelope(SOAPEnvelope envelope) throws
SandeshaException {
- if (log.isDebugEnabled())
- log.debug("Enter: Sender::isFaultEnvelope, " +
envelope.getBody().getFault());
- SOAPFault fault = envelope.getBody().getFault();
- if (fault != null) {
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::isFaultEnvelope,
TRUE");
- return true;
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::isFaultEnvelope, FALSE");
- return false;
}
}
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?rev=433150&view=auto
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
(added)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Sun Aug 20 22:07:59 2006
@@ -0,0 +1,314 @@
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.MissingResourceException;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+public class SenderWorker implements Runnable {
+
+ ConfigurationContext configurationContext = null;
+ SenderBean senderBean = null;
+ Log log = LogFactory.getLog(SenderWorker.class);
+
+ public SenderWorker (ConfigurationContext configurationContext,
SenderBean senderBean) {
+ this.configurationContext = configurationContext;
+ this.senderBean = senderBean;
+ }
+
+ public void run () {
+
+ Transaction transaction = null;
+
+ try {
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configurationContext,
configurationContext.getAxisConfiguration());
+ SenderBeanMgr senderBeanMgr =
storageManager.getRetransmitterBeanMgr();
+
+ transaction = storageManager.getTransaction();
+
+ String key = senderBean.getMessageContextRefKey();
+ MessageContext msgCtx =
storageManager.retrieveMessageContext(key, configurationContext);
+
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_TRUE);
+
+ MessageRetransmissionAdjuster retransmitterAdjuster =
new MessageRetransmissionAdjuster();
+ boolean continueSending =
retransmitterAdjuster.adjustRetransmittion(senderBean, configurationContext,
+ storageManager);
+ if (!continueSending) {
+ return;
+ }
+
+// if (msgCtx == null) {
+// String message = "Message context is not
present in the storage";
+// }
+
+ // sender will not send the message if following
property is
+ // set and not true.
+ // But it will set if it is not set (null)
+
+ // This is used to make sure that the mesage get passed
the
+ // Sandesha2TransportSender.
+
+ String qualifiedForSending = (String)
msgCtx.getProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING);
+ if (qualifiedForSending != null &&
!qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
+ return;
+ }
+
+ if (msgCtx == null) {
+
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendHasUnavailableMsgEntry));
+ return;
+ }
+
+ RMMsgContext rmMsgCtx =
MsgInitializer.initializeMessage(msgCtx);
+
+ // operation is the lowest level Sandesha2 should be
attached
+ ArrayList msgsNotToSend =
SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation()).getMsgTypesToDrop();
+
+ if (msgsNotToSend != null && msgsNotToSend.contains(new
Integer(rmMsgCtx.getMessageType()))) {
+ return;
+ }
+
+ updateMessage(msgCtx);
+
+ int messageType = senderBean.getMessageType();
+// if (messageType ==
Sandesha2Constants.MessageTypes.APPLICATION) {
+// Sequence sequence = (Sequence)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+// String sequenceID =
sequence.getIdentifier().getIdentifier();
+// }
+
+ // checking weather this message can carry piggybacked
acks
+ if (isAckPiggybackableMsgType(messageType)) {
+ // piggybacking if an ack if available for the
same
+ // sequence.
+ // TODO do piggybacking based on wsa:To
+
AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx, storageManager);
+ }
+
+ // sending the message
+ TransportOutDescription transportOutDescription =
msgCtx.getTransportOut();
+ TransportSender transportSender =
transportOutDescription.getSender();
+
+ boolean successfullySent = false;
+ if (transportSender != null) {
+
+ // have to commit the transaction before
sending. This may
+ // get changed when WS-AT is available.
+ transaction.commit();
+
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_FALSE);
+
+ try {
+
+ // had to fully build the SOAP envelope
to support
+ // retransmissions.
+ // Otherwise a 'parserAlreadyAccessed'
exception could
+ // get thrown in retransmissions.
+ // But this has a performance reduction.
+ msgCtx.getEnvelope().build();
+
+ if (log.isDebugEnabled())
+ log.debug("Invoking using
transportSender " + transportSender + ", msgCtx="
+ +
msgCtx.getEnvelope().getHeader());
+ // TODO change this to cater for
security.
+ transportSender.invoke(msgCtx);
+ successfullySent = true;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e
+ .toString());
+ log.debug(message, e);
+ } finally {
+ transaction =
storageManager.getTransaction();
+
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_TRUE);
+ }
+ }
+
+ // update or delete only if the object is still present.
+ SenderBean bean1 =
senderBeanMgr.retrieve(senderBean.getMessageID());
+ if (bean1 != null) {
+ if (senderBean.isReSend()) {
+
bean1.setSentCount(senderBean.getSentCount());
+
bean1.setTimeToSend(senderBean.getTimeToSend());
+ senderBeanMgr.update(bean1);
+ } else {
+
senderBeanMgr.delete(bean1.getMessageID());
+
+ // removing the message from the
storage.
+ String messageStoredKey =
bean1.getMessageContextRefKey();
+
storageManager.removeMessageContext(messageStoredKey);
+ }
+ }
+
+ if (successfullySent) {
+ if (!msgCtx.isServerSide())
+ checkForSyncResponses(msgCtx);
+ }
+
+ if (rmMsgCtx.getMessageType() ==
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+ // terminate sending side.
+ TerminateSequence terminateSequence =
(TerminateSequence) rmMsgCtx
+
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ String sequenceID =
terminateSequence.getIdentifier().getIdentifier();
+ ConfigurationContext configContext =
msgCtx.getConfigurationContext();
+
+ String internalSequenceID =
SandeshaUtil.getSequenceProperty(sequenceID,
+
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
+
TerminateManager.terminateSendingSide(configContext, internalSequenceID,
msgCtx.isServerSide(),
+ storageManager);
+ }
+
+
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_FALSE);
+ } catch (SandeshaStorageException e) {
+ if (transaction!=null && transaction.isActive())
+ transaction.rollback();
+ } catch (SandeshaException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (MissingResourceException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ if (transaction!=null && transaction.isActive())
+ transaction.commit();
+ }
+ }
+
+ private void updateMessage(MessageContext msgCtx1) throws
SandeshaException {
+ // do updates if required.
+ }
+
+ private boolean isAckPiggybackableMsgType(int messageType) {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::isAckPiggybackableMsgType, "
+ messageType);
+ boolean piggybackable = true;
+
+ if (messageType == Sandesha2Constants.MessageTypes.ACK)
+ piggybackable = false;
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::isAckPiggybackableMsgType, " +
piggybackable);
+ return piggybackable;
+ }
+
+ private void checkForSyncResponses(MessageContext msgCtx) throws
SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::checkForSyncResponses, " +
msgCtx.getEnvelope().getHeader());
+
+ try {
+
+ boolean responsePresent =
(msgCtx.getProperty(MessageContext.TRANSPORT_IN) != null);
+ if (!responsePresent)
+ return;
+
+ // create the responseMessageContext
+
+ MessageContext responseMessageContext = new
MessageContext();
+ responseMessageContext.setServerSide(false);
+
responseMessageContext.setConfigurationContext(msgCtx.getConfigurationContext());
+
responseMessageContext.setTransportIn(msgCtx.getTransportIn());
+
responseMessageContext.setTransportOut(msgCtx.getTransportOut());
+
+
responseMessageContext.setProperty(MessageContext.TRANSPORT_IN, msgCtx
+
.getProperty(MessageContext.TRANSPORT_IN));
+
responseMessageContext.setServiceContext(msgCtx.getServiceContext());
+
responseMessageContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
+
+ // copying required properties from op. context to the
response msg
+ // ctx.
+ OperationContext requestMsgOpCtx =
msgCtx.getOperationContext();
+ if (requestMsgOpCtx != null) {
+ if
(responseMessageContext.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE)
== null) {
+
responseMessageContext.setProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE,
requestMsgOpCtx
+
.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE));
+ }
+
+ if
(responseMessageContext.getProperty(HTTPConstants.CHAR_SET_ENCODING) == null) {
+
responseMessageContext.setProperty(HTTPConstants.CHAR_SET_ENCODING,
requestMsgOpCtx
+
.getProperty(HTTPConstants.CHAR_SET_ENCODING));
+ }
+ }
+
+ // If request is REST we assume the
responseMessageContext is REST,
+ // so set the variable
+
+
responseMessageContext.setDoingREST(msgCtx.isDoingREST());
+
+ SOAPEnvelope resenvelope = null;
+ try {
+ resenvelope =
TransportUtils.createSOAPMessage(msgCtx,
msgCtx.getEnvelope().getNamespace().getName());
+
+ } catch (AxisFault e) {
+ // TODO Auto-generated catch block
+
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapEnvNotSet));
+ log.debug(e.getStackTrace().toString());
+ }
+
+ // if the request msg ctx is withina a transaction,
processing if
+ // the response should also happen
+ // withing the same transaction
+
responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, msgCtx
+
.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
+
+ if (resenvelope != null) {
+ responseMessageContext.setEnvelope(resenvelope);
+ AxisEngine engine = new
AxisEngine(msgCtx.getConfigurationContext());
+
+ if (isFaultEnvelope(resenvelope)) {
+
engine.receiveFault(responseMessageContext);
+ } else {
+ engine.receive(responseMessageContext);
+ }
+ }
+
+ } catch (Exception e) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
+ log.debug(message, e);
+ throw new SandeshaException(message, e);
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::checkForSyncResponses");
+ }
+
+ private boolean isFaultEnvelope(SOAPEnvelope envelope) throws
SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::isFaultEnvelope, " +
envelope.getBody().getFault());
+ SOAPFault fault = envelope.getBody().getFault();
+ if (fault != null) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::isFaultEnvelope,
TRUE");
+ return true;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::isFaultEnvelope, FALSE");
+ return false;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]