Author: chamikara
Date: Sun Aug 20 22:07:59 2006
New Revision: 433150

URL: http://svn.apache.org/viewvc?rev=433150&view=rev
Log:
Made sender and Invoker thread pools. Still the pool size is 1.
(hope to bring this into a policy after some testing).
This will allow sandesha2 to send and invoke multiple message concurrently.
This also eliminates some deadlock scenarios which existed before.

Sender, and Invoker classes manage thread pools.
SenderWorker and InvokerWorker does the sending and invoking of a single 
message respectively.
(old InOrderInvoker class was removed).


Added:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Removed:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InOrderInvoker.java
Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java?rev=433150&r1=433149&r2=433150&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/Transaction.java
 Sun Aug 20 22:07:59 2006
@@ -27,4 +27,6 @@
        
        public void rollback ();
        
+       //indicates that the transaction has been started, but has not been 
committed or rolledbacked yet.
+       public boolean isActive ();
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=433150&r1=433149&r2=433150&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 Sun Aug 20 22:07:59 2006
@@ -33,5 +33,9 @@
        public void rollback() {
 
        }
+       
+       public boolean isActive () {
+               return false;
+       }
 
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=433150&r1=433149&r2=433150&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
Sun Aug 20 22:07:59 2006
@@ -65,7 +65,7 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.workers.InOrderInvoker;
+import org.apache.sandesha2.workers.Invoker;
 import org.apache.sandesha2.workers.Sender;
 import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.AcknowledgementRange;
@@ -230,24 +230,24 @@
 
        public static void startInvokerForTheSequence(ConfigurationContext 
context, String sequenceID) {
                
-               InOrderInvoker invoker = (InOrderInvoker) 
context.getProperty(Sandesha2Constants.INVOKER);
+               Invoker invoker = (Invoker) 
context.getProperty(Sandesha2Constants.INVOKER);
                if (invoker!=null)
                        invoker.runInvokerForTheSequence(context,sequenceID);
                else {
-                       invoker = new InOrderInvoker ();
+                       invoker = new Invoker ();
                        context.setProperty(Sandesha2Constants.INVOKER,invoker);
                        invoker.runInvokerForTheSequence(context,sequenceID);
                }
        }
 
        private static void stopInvokerForTheSequence(String sequenceID, 
ConfigurationContext context) {
-               InOrderInvoker invoker = (InOrderInvoker) 
context.getProperty(Sandesha2Constants.INVOKER);
+               Invoker invoker = (Invoker) 
context.getProperty(Sandesha2Constants.INVOKER);
                if (invoker!=null)
                        invoker.stopInvokerForTheSequence(sequenceID);
        }
        
        public static void stopInvoker(ConfigurationContext context) {
-               InOrderInvoker invoker = (InOrderInvoker) 
context.getProperty(Sandesha2Constants.INVOKER);
+               Invoker invoker = (Invoker) 
context.getProperty(Sandesha2Constants.INVOKER);
                if (invoker!=null)
                        invoker.stopInvoking();
        }

Added: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?rev=433150&view=auto
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
(added)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
Sun Aug 20 22:07:59 2006
@@ -0,0 +1,266 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.axis2.util.threadpool.ThreadPool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * This is used when InOrder invocation is required. This is a seperated Thread
+ * that keep running all the time. At each iteration it checks the InvokerTable
+ * to find weather there are any messages to me invoked.
+ */
+
+public class Invoker extends Thread {
+
+       private boolean runInvoker = false;
+       private ArrayList workingSequences = new ArrayList();
+       private ConfigurationContext context = null;
+       private static final Log log = LogFactory.getLog(Invoker.class);
+       private boolean hasStopped = false;
+       
+    private transient ThreadFactory threadPool;
+    public int INVOKER_THREADPOOL_SIZE =5;
+    
+    public Invoker () {
+       threadPool = new ThreadPool 
(INVOKER_THREADPOOL_SIZE,INVOKER_THREADPOOL_SIZE);
+    }
+
+       public synchronized void stopInvokerForTheSequence(String sequenceID) {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: 
InOrderInvoker::stopInvokerForTheSequence, " + sequenceID);
+
+               workingSequences.remove(sequenceID);
+               if (workingSequences.size() == 0) {
+                       runInvoker = false;
+               }
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: 
InOrderInvoker::stopInvokerForTheSequence");
+       }
+
+       public synchronized void stopInvoking() {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: InOrderInvoker::stopInvoking");
+
+               if (isInvokerStarted()) {
+                       // the invoker is started so stop it
+                       runInvoker = false;
+                       // wait for it to finish
+                       while (!hasStoppedInvoking()) {
+                               try {
+                                       
wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
+                               } catch (InterruptedException e1) {
+                                       log.debug(e1.getMessage());
+                               }
+                       }
+               }
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: InOrderInvoker::stopInvoking");
+       }
+
+       public synchronized boolean isInvokerStarted() {
+               if (log.isDebugEnabled()) {
+                       log.debug("Enter: InOrderInvoker::isInvokerStarted");
+                       log.debug("Exit: InOrderInvoker::isInvokerStarted, " + 
runInvoker);
+               }
+               return runInvoker;
+       }
+
+       public synchronized void runInvokerForTheSequence(ConfigurationContext 
context, String sequenceID) {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: 
InOrderInvoker::runInvokerForTheSequence");
+
+               if (!workingSequences.contains(sequenceID))
+                       workingSequences.add(sequenceID);
+
+               if (!isInvokerStarted()) {
+                       this.context = context;
+                       runInvoker = true; // so that isSenderStarted()=true.
+                       super.start();
+               }
+               if (log.isDebugEnabled())
+                       log.debug("Exit: 
InOrderInvoker::runInvokerForTheSequence");
+       }
+
+       private synchronized boolean hasStoppedInvoking() {
+               if (log.isDebugEnabled()) {
+                       log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
+                       log.debug("Exit: InOrderInvoker::hasStoppedInvoking, " 
+ hasStopped);
+               }
+               return hasStopped;
+       }
+
+       public void run() {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: InOrderInvoker::run");
+
+               try {
+                       internalRun();
+               } finally {
+                       // flag that we have exited the run loop and notify any 
waiting
+                       // threads
+                       synchronized (this) {
+                               hasStopped = true;
+                               notify();
+                       }
+               }
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: InOrderInvoker::run");
+       }
+
+       private void internalRun() {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: InOrderInvoker::internalRun");
+
+               while (isInvokerStarted()) {
+
+                       try {
+                               
Thread.sleep(Sandesha2Constants.INVOKER_SLEEP_TIME);
+                       } catch (InterruptedException ex) {
+                               log.debug("Invoker was Inturrepted....");
+                               log.debug(ex.getMessage());
+                       }
+
+                       Transaction transaction = null;
+                       boolean rolebacked = false;
+
+                       try {
+                               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context, context
+                                               .getAxisConfiguration());
+                               NextMsgBeanMgr nextMsgMgr = 
storageManager.getNextMsgBeanMgr();
+
+                               InvokerBeanMgr storageMapMgr = 
storageManager.getStorageMapBeanMgr();
+
+                               SequencePropertyBeanMgr sequencePropMgr = 
storageManager.getSequencePropertyBeanMgr();
+
+                               transaction = storageManager.getTransaction();
+
+                               // Getting the incomingSequenceIdList
+                               SequencePropertyBean allSequencesBean = 
sequencePropMgr.retrieve(
+                                               
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+                                               
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+                               if (allSequencesBean == null) {
+                                       if (log.isDebugEnabled())
+                                               log.debug("AllSequencesBean not 
found");
+                                       continue;
+                               }
+                               ArrayList allSequencesList = 
SandeshaUtil.getArrayListFromString(allSequencesBean.getValue());
+
+                               Iterator allSequencesItr = 
allSequencesList.iterator();
+
+//                             currentIteration: while 
(allSequencesItr.hasNext()) {
+                                       String sequenceId = (String) 
allSequencesItr.next();
+
+                                       NextMsgBean nextMsgBean = 
nextMsgMgr.retrieve(sequenceId);
+                                       if (nextMsgBean == null) {
+                                               String message = "Next message 
not set correctly. Removing invalid entry.";
+                                               log.debug(message);
+                                               allSequencesItr.remove();
+
+                                               // cleaning the invalid data of 
the all sequences.
+                                               
allSequencesBean.setValue(allSequencesList.toString());
+                                               
sequencePropMgr.update(allSequencesBean);
+                                               continue;
+                                       }
+
+                                       long nextMsgno = 
nextMsgBean.getNextMsgNoToProcess();
+                                       if (nextMsgno <= 0) {
+                                               if (log.isDebugEnabled())
+                                                       log.debug("Invalid Next 
Message Number " + nextMsgno);
+                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
+                                                               
.toString(nextMsgno));
+                                               throw new 
SandeshaException(message);
+                                       }
+
+                                       Iterator stMapIt = 
storageMapMgr.find(new InvokerBean(null, nextMsgno, sequenceId)).iterator();
+
+                                       if (stMapIt.hasNext()) {
+
+                                               InvokerBean invokerBean = 
(InvokerBean) stMapIt.next();
+
+                                               transaction.commit();
+                                               
+                                               //start a new worker thread and 
let it do the invocation.
+                                               InvokerWorker worker = new 
InvokerWorker (context,invokerBean);
+                                               threadPool.execute(worker);
+                                               
+                                       }
+
+
+//                             }
+
+                       } catch (Exception e) {
+                               if (transaction != null) {
+                                       try {
+                                               transaction.rollback();
+                                               rolebacked = true;
+                                       } catch (Exception e1) {
+                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
+                                                               .toString());
+                                               log.debug(message, e1);
+                                       }
+                               }
+                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invokeMsgError);
+                               log.debug(message, e);
+                       } finally {
+                               if (!rolebacked && transaction != null) {
+                                       try {
+                                               transaction.commit();
+                                       } catch (Exception e) {
+                                               String message = 
SandeshaMessageHelper
+                                                               
.getMessage(SandeshaMessageKeys.commitError, e.toString());
+                                               log.debug(message, e);
+                                       }
+                               }
+                       }
+               }
+               if (log.isDebugEnabled())
+                       log.debug("Exit: InOrderInvoker::internalRun");
+       }
+
+}

Added: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?rev=433150&view=auto
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
 (added)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
 Sun Aug 20 22:07:59 2006
@@ -0,0 +1,180 @@
+package org.apache.sandesha2.workers;
+
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+
+public class InvokerWorker implements Runnable {
+
+       ConfigurationContext configurationContext = null;
+       InvokerBean invokerBean = null;
+       Log log = LogFactory.getLog(InvokerWorker.class);
+       
+       public InvokerWorker (ConfigurationContext configurationContext, 
InvokerBean invokerBean) {
+               this.configurationContext = configurationContext;
+               this.invokerBean = invokerBean;
+       }
+       
+       public void run() {
+               
+               Transaction transaction = null;
+               MessageContext msgToInvoke = null;
+               
+               try {
+                       
+                       StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+                       InvokerBeanMgr invokerBeanMgr = 
storageManager.getStorageMapBeanMgr();
+                       NextMsgBeanMgr nextMsgMgr = 
storageManager.getNextMsgBeanMgr();
+                       
+                       //starting a transaction
+                       transaction = storageManager.getTransaction();
+                       
+                       String key = invokerBean.getMessageContextRefKey();
+                       
+                       msgToInvoke = 
storageManager.retrieveMessageContext(key, configurationContext);
+                       RMMsgContext rmMsg = 
MsgInitializer.initializeMessage(msgToInvoke);
+
+                       //endint the transaction before invocation.
+                       transaction.commit();
+                               
+                       boolean invoked = false;
+                       
+                       long messageNo = invokerBean.getMsgNo();
+                       
+                       try {
+
+                               // Invocation is not done within a transation. 
This
+                               // may get changed when WS-AT is available.
+                               
+                               // Invoking the message.
+                               
msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
+                                               Sandesha2Constants.VALUE_TRUE);
+
+                               boolean postFailureInvocation = false;
+
+                               // StorageManagers should st following property 
to
+                               // true, to indicate that the message received 
comes
+                               // after a failure.
+                               String postFaulureProperty = (String) 
msgToInvoke
+                                               
.getProperty(Sandesha2Constants.POST_FAILURE_MESSAGE);
+                               if (postFaulureProperty != null
+                                               && 
Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
+                                       postFailureInvocation = true;
+
+                               AxisEngine engine = new 
AxisEngine(configurationContext);
+                               if (postFailureInvocation) {
+                                       
makeMessageReadyForReinjection(msgToInvoke);
+                                       if (log.isDebugEnabled())
+                                               log.debug("Receiving message, 
key=" + key + ", msgCtx="
+                                                               + 
msgToInvoke.getEnvelope().getHeader());
+                                       engine.receive(msgToInvoke);
+                               } else {
+                                       if (log.isDebugEnabled())
+                                               log.debug("Resuming message, 
key=" + key + ", msgCtx="
+                                                               + 
msgToInvoke.getEnvelope().getHeader());
+                                       msgToInvoke.setPaused(false);
+                                       engine.resumeReceive(msgToInvoke);
+                               }
+                               
+                               invoked = true;
+
+                       } catch (Exception e) {
+                               if (log.isDebugEnabled())
+                                       log.debug("Exception :", e);
+
+                               handleFault(msgToInvoke, e);
+
+                               // throw new SandeshaException(e);
+                       }
+                               
+                       //starting a transaction for the post-invocation work.
+                       transaction = storageManager.getTransaction();
+                       
+                       // Service will be invoked only once. I.e. even if an
+                       // exception get thrown in invocation
+                       // the service will not be invoked again.
+                       invokerBeanMgr.delete(key);
+
+                       // removing the corresponding message context as well.
+                       MessageContext msgCtx = 
storageManager.retrieveMessageContext(key, configurationContext);
+                       if (msgCtx != null) {
+                               storageManager.removeMessageContext(key);
+                       }
+
+                       // updating the next msg to invoke
+
+                       String sequenceId = invokerBean.getSequenceID();
+                       NextMsgBean nextMsgBean = 
nextMsgMgr.retrieve(sequenceId);
+
+                       
+                       if (rmMsg.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION) {
+                               Sequence sequence = (Sequence) rmMsg
+                                               
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+                               if (sequence.getLastMessage() != null) {
+                                       
TerminateManager.cleanReceivingSideAfterInvocation(configurationContext, 
sequenceId, storageManager);
+                                       // exit from current iteration. (since 
an entry
+                                       // was removed)
+                                       
+                                       return;
+                               }
+                       }
+                       
+                       long nextMsgNo = nextMsgBean.getNextMsgNoToProcess();
+                       
+                       if (!(messageNo==nextMsgNo)) {
+                               String message = "Operated message number is 
different from the Next Message Number to invoke";
+                               throw new SandeshaException (message);
+                       }
+                       
+                       if (invoked) {
+                               nextMsgNo++;
+                               nextMsgBean.setNextMsgNoToProcess(nextMsgNo);
+                               nextMsgMgr.update(nextMsgBean);
+                       }
+               } catch (SandeshaStorageException e) {
+                       transaction.rollback();
+               } catch (SandeshaException e) {
+                       e.printStackTrace(); //TODO remove
+                       log.error(e);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       log.error(e);
+               } finally {
+                       if (transaction!=null && transaction.isActive())
+                               transaction.commit();
+               }
+       }
+
+       private void makeMessageReadyForReinjection(MessageContext 
messageContext) {
+               
messageContext.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, null);
+               messageContext.getOptions().setMessageId(null);
+               messageContext.getOptions().setTo(null);
+               messageContext.getOptions().setAction(null);
+               
messageContext.setProperty(Sandesha2Constants.REINJECTED_MESSAGE, 
Sandesha2Constants.VALUE_TRUE);
+       }
+
+       private void handleFault(MessageContext inMsgContext, Exception e) 
throws Exception {
+               // msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
+               AxisEngine engine = new 
AxisEngine(inMsgContext.getConfigurationContext());
+               MessageContext faultContext = 
engine.createFaultMessageContext(inMsgContext, e);
+               engine.sendFault(faultContext);
+       }
+       
+}

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?rev=433150&r1=433149&r2=433150&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
Sun Aug 20 22:07:59 2006
@@ -19,20 +19,11 @@
 
 import java.util.ArrayList;
 
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFault;
-import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.transport.TransportSender;
-import org.apache.axis2.transport.TransportUtils;
-import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.axis2.util.threadpool.ThreadPool;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
@@ -41,13 +32,7 @@
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
-import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
 
 /**
  * This is responsible for sending and re-sending messages of Sandesha2. This
@@ -58,14 +43,17 @@
 public class Sender extends Thread {
 
        private boolean runSender = false;
-
        private ArrayList workingSequences = new ArrayList();
-
        private ConfigurationContext context = null;
-
        private static final Log log = LogFactory.getLog(Sender.class);
-
        private boolean hasStopped = false;
+       
+    private transient ThreadFactory threadPool;
+    public int SENDER_THREADPOOL_SIZE = 5;
+    
+    public Sender () {
+       threadPool = new ThreadPool 
(SENDER_THREADPOOL_SIZE,SENDER_THREADPOOL_SIZE);
+    }
 
        public synchronized void stopSenderForTheSequence(String sequenceID) {
                if (log.isDebugEnabled())
@@ -180,142 +168,25 @@
                                                log.debug("SenderBean not 
found");
                                        continue;
                                }
-
-                               String key = 
senderBean.getMessageContextRefKey();
-                               MessageContext msgCtx = 
storageManager.retrieveMessageContext(key, context);
-                               
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_TRUE);
-
-                               MessageRetransmissionAdjuster 
retransmitterAdjuster = new MessageRetransmissionAdjuster();
-                               boolean continueSending = 
retransmitterAdjuster.adjustRetransmittion(senderBean, context,
-                                               storageManager);
-                               if (!continueSending) {
-                                       continue;
-                               }
-
-                               if (msgCtx == null) {
-                                       String message = "Message context is 
not present in the storage";
-                               }
-
-                               // sender will not send the message if 
following property is
-                               // set and not true.
-                               // But it will set if it is not set (null)
-
-                               // This is used to make sure that the mesage 
get passed the
-                               // Sandesha2TransportSender.
-
-                               String qualifiedForSending = (String) 
msgCtx.getProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING);
-                               if (qualifiedForSending != null && 
!qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
-                                       continue;
-                               }
-
-                               if (msgCtx == null) {
-                                       
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendHasUnavailableMsgEntry));
-                                       break;
-                               }
-
-                               RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);
-
-                               // operation is the lowest level Sandesha2 
should be attached
-                               ArrayList msgsNotToSend = 
SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation()).getMsgTypesToDrop();
-
-                               if (msgsNotToSend != null && 
msgsNotToSend.contains(new Integer(rmMsgCtx.getMessageType()))) {
-                                       continue;
-                               }
-
-                               updateMessage(msgCtx);
-
-                               int messageType = senderBean.getMessageType();
-                               if (messageType == 
Sandesha2Constants.MessageTypes.APPLICATION) {
-                                       Sequence sequence = (Sequence) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-                                       String sequenceID = 
sequence.getIdentifier().getIdentifier();
-                               }
-
-                               // checking weather this message can carry 
piggybacked acks
-                               if (isAckPiggybackableMsgType(messageType)) {
-                                       // piggybacking if an ack if available 
for the same
-                                       // sequence.
-                                       // TODO do piggybacking based on wsa:To
-                                       
AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx, storageManager);
-                               }
-
-                               // sending the message
-                               TransportOutDescription transportOutDescription 
= msgCtx.getTransportOut();
-                               TransportSender transportSender = 
transportOutDescription.getSender();
-
-                               boolean successfullySent = false;
-                               if (transportSender != null) {
-
-                                       // have to commit the transaction 
before sending. This may
-                                       // get changed when WS-AT is available.
-                                       transaction.commit();
-                                       
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_FALSE);
-                                       try {
-
-                                               // had to fully build the SOAP 
envelope to support
-                                               // retransmissions.
-                                               // Otherwise a 
'parserAlreadyAccessed' exception could
-                                               // get thrown in 
retransmissions.
-                                               // But this has a performance 
reduction.
-                                               msgCtx.getEnvelope().build();
-
-                                               if (log.isDebugEnabled())
-                                                       log.debug("Invoking 
using transportSender " + transportSender + ", msgCtx="
-                                                                       + 
msgCtx.getEnvelope().getHeader());
-                                               // TODO change this to cater 
for security.
-                                               transportSender.invoke(msgCtx);
-                                               successfullySent = true;
-                                       } catch (Exception e) {
-                                               // TODO Auto-generated catch 
block
-                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e
-                                                               .toString());
-                                               log.debug(message, e);
-                                       } finally {
-                                               transaction = 
storageManager.getTransaction();
-                                               
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_TRUE);
-                                       }
-                               }
-
-                               // update or delete only if the object is still 
present.
-                               SenderBean bean1 = 
mgr.retrieve(senderBean.getMessageID());
-                               if (bean1 != null) {
-                                       if (senderBean.isReSend()) {
-                                               
bean1.setSentCount(senderBean.getSentCount());
-                                               
bean1.setTimeToSend(senderBean.getTimeToSend());
-                                               mgr.update(bean1);
-                                       } else {
-                                               
mgr.delete(bean1.getMessageID());
-
-                                               // removing the message from 
the storage.
-                                               String messageStoredKey = 
bean1.getMessageContextRefKey();
-                                               
storageManager.removeMessageContext(messageStoredKey);
-                                       }
-                               }
-
-                               if (successfullySent) {
-                                       if (!msgCtx.isServerSide())
-                                               checkForSyncResponses(msgCtx);
-                               }
-
-                               if (rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-                                       // terminate sending side.
-                                       TerminateSequence terminateSequence = 
(TerminateSequence) rmMsgCtx
-                                                       
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-                                       String sequenceID = 
terminateSequence.getIdentifier().getIdentifier();
-                                       ConfigurationContext configContext = 
msgCtx.getConfigurationContext();
-
-                                       String internalSequenceID = 
SandeshaUtil.getSequenceProperty(sequenceID,
-                                                       
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
-                                       
TerminateManager.terminateSendingSide(configContext, internalSequenceID, 
msgCtx.isServerSide(),
-                                                       storageManager);
-                               }
-
-                               
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_FALSE);
+                               
+                               transaction.commit();
+                               
+                               
+                               
+                               //start a worker which will work on this 
message.s
+                               SenderWorker worker = new SenderWorker 
(context,senderBean);
+                               threadPool.execute(worker);
+                               
 
                        } catch (Exception e) {
 
                                // TODO : when this is the client side throw 
the exception to
                                // the client when necessary.
 
+                               
+                               //TODO rollback only if a 
SandeshaStorageException.
+                               //This allows the other Exceptions to be used 
within the Normal flow.
+                               
                                if (transaction != null) {
                                        try {
                                                transaction.rollback();
@@ -360,117 +231,6 @@
                }
                if (log.isDebugEnabled())
                        log.debug("Exit: Sender::runSenderForTheSequence");
-       }
-
-       private void updateMessage(MessageContext msgCtx1) throws 
SandeshaException {
-               // do updates if required.
-       }
-
-       private void checkForSyncResponses(MessageContext msgCtx) throws 
SandeshaException {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: Sender::checkForSyncResponses, " + 
msgCtx.getEnvelope().getHeader());
-
-               try {
-
-                       boolean responsePresent = 
(msgCtx.getProperty(MessageContext.TRANSPORT_IN) != null);
-                       if (!responsePresent)
-                               return;
-
-                       // create the responseMessageContext
-
-                       MessageContext responseMessageContext = new 
MessageContext();
-                       responseMessageContext.setServerSide(false);
-                       
responseMessageContext.setConfigurationContext(msgCtx.getConfigurationContext());
-                       
responseMessageContext.setTransportIn(msgCtx.getTransportIn());
-                       
responseMessageContext.setTransportOut(msgCtx.getTransportOut());
-
-                       
responseMessageContext.setProperty(MessageContext.TRANSPORT_IN, msgCtx
-                                       
.getProperty(MessageContext.TRANSPORT_IN));
-                       
responseMessageContext.setServiceContext(msgCtx.getServiceContext());
-                       
responseMessageContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
-
-                       // copying required properties from op. context to the 
response msg
-                       // ctx.
-                       OperationContext requestMsgOpCtx = 
msgCtx.getOperationContext();
-                       if (requestMsgOpCtx != null) {
-                               if 
(responseMessageContext.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE) 
== null) {
-                                       
responseMessageContext.setProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE, 
requestMsgOpCtx
-                                                       
.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE));
-                               }
-
-                               if 
(responseMessageContext.getProperty(HTTPConstants.CHAR_SET_ENCODING) == null) {
-                                       
responseMessageContext.setProperty(HTTPConstants.CHAR_SET_ENCODING, 
requestMsgOpCtx
-                                                       
.getProperty(HTTPConstants.CHAR_SET_ENCODING));
-                               }
-                       }
-
-                       // If request is REST we assume the 
responseMessageContext is REST,
-                       // so set the variable
-
-                       
responseMessageContext.setDoingREST(msgCtx.isDoingREST());
-
-                       SOAPEnvelope resenvelope = null;
-                       try {
-                               resenvelope = 
TransportUtils.createSOAPMessage(msgCtx, 
msgCtx.getEnvelope().getNamespace().getName());
-
-                       } catch (AxisFault e) {
-                               // TODO Auto-generated catch block
-                               
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapEnvNotSet));
-                               log.debug(e.getStackTrace().toString());
-                       }
-
-                       // if the request msg ctx is withina a transaction, 
processing if
-                       // the response should also happen
-                       // withing the same transaction
-                       
responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, msgCtx
-                                       
.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
-
-                       if (resenvelope != null) {
-                               responseMessageContext.setEnvelope(resenvelope);
-                               AxisEngine engine = new 
AxisEngine(msgCtx.getConfigurationContext());
-
-                               if (isFaultEnvelope(resenvelope)) {
-                                       
engine.receiveFault(responseMessageContext);
-                               } else {
-                                       engine.receive(responseMessageContext);
-                               }
-                       }
-
-               } catch (Exception e) {
-                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
-                       log.debug(message, e);
-                       throw new SandeshaException(message, e);
-               }
-               if (log.isDebugEnabled())
-                       log.debug("Exit: Sender::checkForSyncResponses");
-       }
-
-       private boolean isAckPiggybackableMsgType(int messageType) {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: Sender::isAckPiggybackableMsgType, " 
+ messageType);
-               boolean piggybackable = true;
-
-               if (messageType == Sandesha2Constants.MessageTypes.ACK)
-                       piggybackable = false;
-
-               if (log.isDebugEnabled())
-                       log.debug("Exit: Sender::isAckPiggybackableMsgType, " + 
piggybackable);
-               return piggybackable;
-       }
-
-       private boolean isFaultEnvelope(SOAPEnvelope envelope) throws 
SandeshaException {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: Sender::isFaultEnvelope, " + 
envelope.getBody().getFault());
-               SOAPFault fault = envelope.getBody().getFault();
-               if (fault != null) {
-                       if (log.isDebugEnabled())
-                               log.debug("Exit: Sender::isFaultEnvelope, 
TRUE");
-                       return true;
-               }
-
-               if (log.isDebugEnabled())
-                       log.debug("Exit: Sender::isFaultEnvelope, FALSE");
-               return false;
        }
 
 }

Added: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?rev=433150&view=auto
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 (added)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 Sun Aug 20 22:07:59 2006
@@ -0,0 +1,314 @@
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.MissingResourceException;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+public class SenderWorker implements Runnable {
+
+       ConfigurationContext configurationContext = null;
+       SenderBean senderBean = null;
+       Log log = LogFactory.getLog(SenderWorker.class);
+       
+       public SenderWorker (ConfigurationContext configurationContext, 
SenderBean senderBean) {
+               this.configurationContext = configurationContext;
+               this.senderBean = senderBean;
+       }
+       
+       public void run () {
+               
+               Transaction transaction = null;
+               
+               try {
+                       StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext, 
configurationContext.getAxisConfiguration());
+                       SenderBeanMgr senderBeanMgr = 
storageManager.getRetransmitterBeanMgr();
+                       
+                       transaction = storageManager.getTransaction();
+
+                       String key = senderBean.getMessageContextRefKey();
+                       MessageContext msgCtx = 
storageManager.retrieveMessageContext(key, configurationContext);
+                       
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_TRUE);
+
+                       MessageRetransmissionAdjuster retransmitterAdjuster = 
new MessageRetransmissionAdjuster();
+                       boolean continueSending = 
retransmitterAdjuster.adjustRetransmittion(senderBean, configurationContext,
+                                       storageManager);
+                       if (!continueSending) {
+                               return;
+                       }
+
+//                     if (msgCtx == null) {
+//                             String message = "Message context is not 
present in the storage";
+//                     }
+
+                       // sender will not send the message if following 
property is
+                       // set and not true.
+                       // But it will set if it is not set (null)
+
+                       // This is used to make sure that the mesage get passed 
the
+                       // Sandesha2TransportSender.
+
+                       String qualifiedForSending = (String) 
msgCtx.getProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING);
+                       if (qualifiedForSending != null && 
!qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
+                               return;
+                       }
+
+                       if (msgCtx == null) {
+                               
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendHasUnavailableMsgEntry));
+                               return;                 
+                       }
+
+                       RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);
+
+                       // operation is the lowest level Sandesha2 should be 
attached
+                       ArrayList msgsNotToSend = 
SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation()).getMsgTypesToDrop();
+
+                       if (msgsNotToSend != null && msgsNotToSend.contains(new 
Integer(rmMsgCtx.getMessageType()))) {
+                               return; 
+                       }
+
+                       updateMessage(msgCtx);
+
+                       int messageType = senderBean.getMessageType();
+//                     if (messageType == 
Sandesha2Constants.MessageTypes.APPLICATION) {
+//                             Sequence sequence = (Sequence) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+//                             String sequenceID = 
sequence.getIdentifier().getIdentifier();
+//                     }
+
+                       // checking weather this message can carry piggybacked 
acks
+                       if (isAckPiggybackableMsgType(messageType)) {
+                               // piggybacking if an ack if available for the 
same
+                               // sequence.
+                               // TODO do piggybacking based on wsa:To
+                               
AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx, storageManager);
+                       }
+
+                       // sending the message
+                       TransportOutDescription transportOutDescription = 
msgCtx.getTransportOut();
+                       TransportSender transportSender = 
transportOutDescription.getSender();
+
+                       boolean successfullySent = false;
+                       if (transportSender != null) {
+
+                               // have to commit the transaction before 
sending. This may
+                               // get changed when WS-AT is available.
+                               transaction.commit();
+                               
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_FALSE);
+                               
+                               try {
+
+                                       // had to fully build the SOAP envelope 
to support
+                                       // retransmissions.
+                                       // Otherwise a 'parserAlreadyAccessed' 
exception could
+                                       // get thrown in retransmissions.
+                                       // But this has a performance reduction.
+                                       msgCtx.getEnvelope().build();
+
+                                       if (log.isDebugEnabled())
+                                               log.debug("Invoking using 
transportSender " + transportSender + ", msgCtx="
+                                                               + 
msgCtx.getEnvelope().getHeader());
+                                       // TODO change this to cater for 
security.
+                                       transportSender.invoke(msgCtx);
+                                       successfullySent = true;
+                               } catch (Exception e) {
+                                       // TODO Auto-generated catch block
+                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e
+                                                       .toString());
+                                       log.debug(message, e);
+                               } finally {
+                                       transaction = 
storageManager.getTransaction();
+                                       
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_TRUE);
+                               }
+                       }
+
+                       // update or delete only if the object is still present.
+                       SenderBean bean1 = 
senderBeanMgr.retrieve(senderBean.getMessageID());
+                       if (bean1 != null) {
+                               if (senderBean.isReSend()) {
+                                       
bean1.setSentCount(senderBean.getSentCount());
+                                       
bean1.setTimeToSend(senderBean.getTimeToSend());
+                                       senderBeanMgr.update(bean1);
+                               } else {
+                                       
senderBeanMgr.delete(bean1.getMessageID());
+
+                                       // removing the message from the 
storage.
+                                       String messageStoredKey = 
bean1.getMessageContextRefKey();
+                                       
storageManager.removeMessageContext(messageStoredKey);
+                               }
+                       }
+
+                       if (successfullySent) {
+                               if (!msgCtx.isServerSide())
+                                       checkForSyncResponses(msgCtx);
+                       }
+
+                       if (rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+                               // terminate sending side.
+                               TerminateSequence terminateSequence = 
(TerminateSequence) rmMsgCtx
+                                               
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+                               String sequenceID = 
terminateSequence.getIdentifier().getIdentifier();
+                               ConfigurationContext configContext = 
msgCtx.getConfigurationContext();
+
+                               String internalSequenceID = 
SandeshaUtil.getSequenceProperty(sequenceID,
+                                               
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
+                               
TerminateManager.terminateSendingSide(configContext, internalSequenceID, 
msgCtx.isServerSide(),
+                                               storageManager);
+                       }
+
+                       
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_FALSE);
+               } catch (SandeshaStorageException e) { 
+                       if (transaction!=null && transaction.isActive())
+                               transaction.rollback();
+               } catch (SandeshaException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               } catch (MissingResourceException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               } finally {
+                       if (transaction!=null && transaction.isActive())
+                               transaction.commit();
+               }
+       }
+       
+       private void updateMessage(MessageContext msgCtx1) throws 
SandeshaException {
+               // do updates if required.
+       }
+       
+       private boolean isAckPiggybackableMsgType(int messageType) {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: Sender::isAckPiggybackableMsgType, " 
+ messageType);
+               boolean piggybackable = true;
+
+               if (messageType == Sandesha2Constants.MessageTypes.ACK)
+                       piggybackable = false;
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: Sender::isAckPiggybackableMsgType, " + 
piggybackable);
+               return piggybackable;
+       }
+       
+       private void checkForSyncResponses(MessageContext msgCtx) throws 
SandeshaException {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: Sender::checkForSyncResponses, " + 
msgCtx.getEnvelope().getHeader());
+
+               try {
+
+                       boolean responsePresent = 
(msgCtx.getProperty(MessageContext.TRANSPORT_IN) != null);
+                       if (!responsePresent)
+                               return;
+
+                       // create the responseMessageContext
+
+                       MessageContext responseMessageContext = new 
MessageContext();
+                       responseMessageContext.setServerSide(false);
+                       
responseMessageContext.setConfigurationContext(msgCtx.getConfigurationContext());
+                       
responseMessageContext.setTransportIn(msgCtx.getTransportIn());
+                       
responseMessageContext.setTransportOut(msgCtx.getTransportOut());
+
+                       
responseMessageContext.setProperty(MessageContext.TRANSPORT_IN, msgCtx
+                                       
.getProperty(MessageContext.TRANSPORT_IN));
+                       
responseMessageContext.setServiceContext(msgCtx.getServiceContext());
+                       
responseMessageContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
+
+                       // copying required properties from op. context to the 
response msg
+                       // ctx.
+                       OperationContext requestMsgOpCtx = 
msgCtx.getOperationContext();
+                       if (requestMsgOpCtx != null) {
+                               if 
(responseMessageContext.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE) 
== null) {
+                                       
responseMessageContext.setProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE, 
requestMsgOpCtx
+                                                       
.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE));
+                               }
+
+                               if 
(responseMessageContext.getProperty(HTTPConstants.CHAR_SET_ENCODING) == null) {
+                                       
responseMessageContext.setProperty(HTTPConstants.CHAR_SET_ENCODING, 
requestMsgOpCtx
+                                                       
.getProperty(HTTPConstants.CHAR_SET_ENCODING));
+                               }
+                       }
+
+                       // If request is REST we assume the 
responseMessageContext is REST,
+                       // so set the variable
+
+                       
responseMessageContext.setDoingREST(msgCtx.isDoingREST());
+
+                       SOAPEnvelope resenvelope = null;
+                       try {
+                               resenvelope = 
TransportUtils.createSOAPMessage(msgCtx, 
msgCtx.getEnvelope().getNamespace().getName());
+
+                       } catch (AxisFault e) {
+                               // TODO Auto-generated catch block
+                               
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapEnvNotSet));
+                               log.debug(e.getStackTrace().toString());
+                       }
+
+                       // if the request msg ctx is withina a transaction, 
processing if
+                       // the response should also happen
+                       // withing the same transaction
+                       
responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, msgCtx
+                                       
.getProperty(Sandesha2Constants.WITHIN_TRANSACTION));
+
+                       if (resenvelope != null) {
+                               responseMessageContext.setEnvelope(resenvelope);
+                               AxisEngine engine = new 
AxisEngine(msgCtx.getConfigurationContext());
+
+                               if (isFaultEnvelope(resenvelope)) {
+                                       
engine.receiveFault(responseMessageContext);
+                               } else {
+                                       engine.receive(responseMessageContext);
+                               }
+                       }
+
+               } catch (Exception e) {
+                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
+                       log.debug(message, e);
+                       throw new SandeshaException(message, e);
+               }
+               if (log.isDebugEnabled())
+                       log.debug("Exit: Sender::checkForSyncResponses");
+       }
+       
+       private boolean isFaultEnvelope(SOAPEnvelope envelope) throws 
SandeshaException {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: Sender::isFaultEnvelope, " + 
envelope.getBody().getFault());
+               SOAPFault fault = envelope.getBody().getFault();
+               if (fault != null) {
+                       if (log.isDebugEnabled())
+                               log.debug("Exit: Sender::isFaultEnvelope, 
TRUE");
+                       return true;
+               }
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: Sender::isFaultEnvelope, FALSE");
+               return false;
+       }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to