Author: chamikara
Date: Tue Aug 22 23:28:48 2006
New Revision: 433934
URL: http://svn.apache.org/viewvc?rev=433934&view=rev
Log:
Changed SequenceManager updateClientListnerIfNeeded method to pick the
incomingTransport from the
scheme of the To url. This will only be picked if the transportInProtocol is
not set.
Refactored the Invoker.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
Tue Aug 22 23:28:48 2006
@@ -221,5 +221,7 @@
public static final String secureDummyNoSTR ="secureDummyNoSTR";
public static final String cannotFindTransportInDesc =
"cannotFindTransportInDesc";
+ public static final String toEPRNotSet = "toEPRNotSet";
+
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
Tue Aug 22 23:28:48 2006
@@ -80,7 +80,6 @@
cannotInnitMessage=Sandesha2 Internal error: cannot initialize the message.
propertyInvalidValue=Sandesha2 Internal error: property {0} contains an
invalid value.
couldNotCopyParameters=Could not copy parameters when creating the new RM
Message.
-cannotFindTransportInDesc=Cannot find the transport in description {0} in the
ConfigurationContext.
#-------------------------------------
#
@@ -244,6 +243,8 @@
terminateOpperationIsNull=Terminate Operation was null
invalidMsgNumberList=Invalid msg number list
cannotFindReqMsgFromOpContext=Cannot find the request message from the
operation context
+toEPRNotSet=To EPR has not been set in the given message
+cannotFindTransportInDesc=Cannot find the transport in description {0} in the
ConfigurationContext
#------------------
# Security messages
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
Tue Aug 22 23:28:48 2006
@@ -130,8 +130,13 @@
}
public synchronized SenderBean getNextMsgToSend() {
+
Iterator iterator = table.keySet().iterator();
+ //TODO
+ //pick a random sequence out of the sequences to be sent.
+
+
long lowestAppMsgNo = 0;
while (iterator.hasNext()) {
Object key = iterator.next();
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Tue Aug 22 23:28:48 2006
@@ -6,6 +6,8 @@
*/
package org.apache.sandesha2.util;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Collection;
import javax.xml.namespace.QName;
@@ -349,9 +351,27 @@
try {
if ((startListnerForAsyncAcks ||
startListnerForAsyncControlMsgs) ) {
- if (transportInProtocol == null)
- throw new
SandeshaException(SandeshaMessageHelper
-
.getMessage(SandeshaMessageKeys.cannotStartListenerForIncommingMsgs));
+ if (transportInProtocol == null){
+ EndpointReference toEPR =
messageContext.getOptions().getTo();
+ if (toEPR==null) {
+ String message =
SandeshaMessageHelper.getMessage(
+
SandeshaMessageKeys.toEPRNotSet);
+ throw new AxisFault (message);
+ }
+
+ try {
+ URI uri = new URI
(toEPR.getAddress());
+ String scheme = uri.getScheme();
+
+ //this is a convention is
Axis2. The name of the TransportInDescription has to be the
+ //scheme of a URI of that
transport.
+ //Here we also assume that the
Incoming transport will be same as the outgoing one.
+ transportInProtocol = scheme;
+ } catch (URISyntaxException e) {
+ throw new SandeshaException (e);
+ }
+
+ }
//TODO following code was taken from
ServiceContext.gegMyEPR method.
// When a listner-starting method
becomes available from Axis2, use that.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
Tue Aug 22 23:28:48 2006
@@ -55,21 +55,28 @@
public class Invoker extends Thread {
private boolean runInvoker = false;
+
private ArrayList workingSequences = new ArrayList();
+
private ConfigurationContext context = null;
+
private static final Log log = LogFactory.getLog(Invoker.class);
+
private boolean hasStopped = false;
-
- private transient ThreadFactory threadPool;
- public int INVOKER_THREADPOOL_SIZE =5;
-
- public Invoker () {
- threadPool = new ThreadPool
(INVOKER_THREADPOOL_SIZE,INVOKER_THREADPOOL_SIZE);
- }
+
+ private transient ThreadFactory threadPool;
+
+ public int INVOKER_THREADPOOL_SIZE = 5;
+
+ public Invoker() {
+ threadPool = new ThreadPool(INVOKER_THREADPOOL_SIZE,
+ INVOKER_THREADPOOL_SIZE);
+ }
public synchronized void stopInvokerForTheSequence(String sequenceID) {
if (log.isDebugEnabled())
- log.debug("Enter:
InOrderInvoker::stopInvokerForTheSequence, " + sequenceID);
+ log.debug("Enter:
InOrderInvoker::stopInvokerForTheSequence, "
+ + sequenceID);
workingSequences.remove(sequenceID);
if (workingSequences.size() == 0) {
@@ -109,7 +116,8 @@
return runInvoker;
}
- public synchronized void runInvokerForTheSequence(ConfigurationContext
context, String sequenceID) {
+ public synchronized void runInvokerForTheSequence(
+ ConfigurationContext context, String sequenceID) {
if (log.isDebugEnabled())
log.debug("Enter:
InOrderInvoker::runInvokerForTheSequence");
@@ -128,7 +136,9 @@
private synchronized boolean hasStoppedInvoking() {
if (log.isDebugEnabled()) {
log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
- log.debug("Exit: InOrderInvoker::hasStoppedInvoking, "
+ hasStopped);
+ log
+ .debug("Exit:
InOrderInvoker::hasStoppedInvoking, "
+ + hasStopped);
}
return hasStopped;
}
@@ -169,70 +179,75 @@
boolean rolebacked = false;
try {
- StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(context, context
- .getAxisConfiguration());
+ StorageManager storageManager = SandeshaUtil
+
.getSandeshaStorageManager(context, context
+
.getAxisConfiguration());
NextMsgBeanMgr nextMsgMgr =
storageManager.getNextMsgBeanMgr();
- InvokerBeanMgr storageMapMgr =
storageManager.getStorageMapBeanMgr();
+ InvokerBeanMgr storageMapMgr = storageManager
+ .getStorageMapBeanMgr();
- SequencePropertyBeanMgr sequencePropMgr =
storageManager.getSequencePropertyBeanMgr();
+ SequencePropertyBeanMgr sequencePropMgr =
storageManager
+ .getSequencePropertyBeanMgr();
transaction = storageManager.getTransaction();
// Getting the incomingSequenceIdList
- SequencePropertyBean allSequencesBean =
sequencePropMgr.retrieve(
-
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
-
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+ SequencePropertyBean allSequencesBean =
sequencePropMgr
+ .retrieve(
+
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
if (allSequencesBean == null) {
if (log.isDebugEnabled())
log.debug("AllSequencesBean not
found");
continue;
}
- ArrayList allSequencesList =
SandeshaUtil.getArrayListFromString(allSequencesBean.getValue());
-
+
+ //TODO pick the sequence randomly.
+ ArrayList allSequencesList = SandeshaUtil
+
.getArrayListFromString(allSequencesBean.getValue());
Iterator allSequencesItr =
allSequencesList.iterator();
+ String sequenceId = (String)
allSequencesItr.next();
-// currentIteration: while
(allSequencesItr.hasNext()) {
- String sequenceId = (String)
allSequencesItr.next();
-
- NextMsgBean nextMsgBean =
nextMsgMgr.retrieve(sequenceId);
- if (nextMsgBean == null) {
- String message = "Next message
not set correctly. Removing invalid entry.";
- log.debug(message);
- allSequencesItr.remove();
-
- // cleaning the invalid data of
the all sequences.
-
allSequencesBean.setValue(allSequencesList.toString());
-
sequencePropMgr.update(allSequencesBean);
- continue;
- }
+ NextMsgBean nextMsgBean =
nextMsgMgr.retrieve(sequenceId);
+ if (nextMsgBean == null) {
+ String message = "Next message not set
correctly. Removing invalid entry.";
+ log.debug(message);
+ allSequencesItr.remove();
+
+ // cleaning the invalid data of the all
sequences.
+
allSequencesBean.setValue(allSequencesList.toString());
+
sequencePropMgr.update(allSequencesBean);
+ continue;
+ }
- long nextMsgno =
nextMsgBean.getNextMsgNoToProcess();
- if (nextMsgno <= 0) {
- if (log.isDebugEnabled())
- log.debug("Invalid Next
Message Number " + nextMsgno);
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
-
.toString(nextMsgno));
- throw new
SandeshaException(message);
- }
+ long nextMsgno =
nextMsgBean.getNextMsgNoToProcess();
+ if (nextMsgno <= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Invalid Next Message
Number " + nextMsgno);
+ String message =
SandeshaMessageHelper.getMessage(
+
SandeshaMessageKeys.invalidMsgNumber, Long
+
.toString(nextMsgno));
+ throw new SandeshaException(message);
+ }
- Iterator stMapIt =
storageMapMgr.find(new InvokerBean(null, nextMsgno, sequenceId)).iterator();
+ Iterator stMapIt = storageMapMgr.find(
+ new InvokerBean(null,
nextMsgno, sequenceId))
+ .iterator();
- if (stMapIt.hasNext()) {
+ if (stMapIt.hasNext()) {
- InvokerBean invokerBean =
(InvokerBean) stMapIt.next();
+ InvokerBean invokerBean = (InvokerBean)
stMapIt.next();
- transaction.commit();
-
- //start a new worker thread and
let it do the invocation.
- InvokerWorker worker = new
InvokerWorker (context,invokerBean);
- threadPool.execute(worker);
-
- }
+ transaction.commit();
+ // start a new worker thread and let it
do the invocation.
+ InvokerWorker worker = new
InvokerWorker(context,
+ invokerBean);
+ threadPool.execute(worker);
-// }
+ }
} catch (Exception e) {
if (transaction != null) {
@@ -240,20 +255,22 @@
transaction.rollback();
rolebacked = true;
} catch (Exception e1) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
- .toString());
+ String message =
SandeshaMessageHelper.getMessage(
+
SandeshaMessageKeys.rollbackError, e1
+
.toString());
log.debug(message, e1);
}
}
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invokeMsgError);
+ String message = SandeshaMessageHelper
+
.getMessage(SandeshaMessageKeys.invokeMsgError);
log.debug(message, e);
} finally {
if (!rolebacked && transaction != null) {
try {
transaction.commit();
} catch (Exception e) {
- String message =
SandeshaMessageHelper
-
.getMessage(SandeshaMessageKeys.commitError, e.toString());
+ String message =
SandeshaMessageHelper.getMessage(
+
SandeshaMessageKeys.commitError, e.toString());
log.debug(message, e);
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Tue Aug 22 23:28:48 2006
@@ -171,12 +171,9 @@
transaction.commit();
-
-
//start a worker which will work on this
message.s
SenderWorker worker = new SenderWorker
(context,senderBean);
threadPool.execute(worker);
-
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]