Author: asankha
Date: Sun Dec 7 20:48:21 2008
New Revision: 724253
URL: http://svn.apache.org/viewvc?rev=724253&view=rev
Log:
AXIS2-4164 - Add two new parameters to specify JMS username and password
(transport.jms.UserName and transport.jms.Password)
Fix issue with the generation of the WSDL EPR
Correctly call JMSMessageSender.close() after a send operation, and release JMS
resources as appropriate
Support ActiveMQ dynamicQueues/ and dynamicTopics/ prefixes for convenience,
since most of the demonstrations will be against AMQ
Test on connection termination/exceptions and re-connection - probably could
refine this logic a bit more
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
Sun Dec 7 20:48:21 2008
@@ -254,8 +254,8 @@
try {
connection = JMSUtils.createConnection(
conFactory,
- parameters.get(Context.SECURITY_PRINCIPAL),
- parameters.get(Context.SECURITY_CREDENTIALS),
+ parameters.get(JMSConstants.PARAM_JMS_USERNAME),
+ parameters.get(JMSConstants.PARAM_JMS_PASSWORD),
isJmsSpec11(), isQueue());
if (log.isDebugEnabled()) {
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
Sun Dec 7 20:48:21 2008
@@ -188,6 +188,11 @@
/** @see PARAM_RECON_INIT_DURATION */
public static final String PARAM_RECON_MAX_DURATION =
"transport.jms.MaxReconnectDuration";
+ /** The username to use when obtaining a JMS Connection */
+ public static final String PARAM_JMS_USERNAME = "transport.jms.UserName";
+ /** The password to use when obtaining a JMS Connection */
+ public static final String PARAM_JMS_PASSWORD = "transport.jms.Password";
+
//-------------- message context / transport header properties and client
options --------------
/**
* A MessageContext property or client Option indicating the JMS message
type
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
Sun Dec 7 20:48:21 2008
@@ -122,8 +122,7 @@
JMSEndpoint endpoint = new JMSEndpoint();
endpoint.setService(service);
endpoint.setCf(cf);
- endpoint.computeEPRs(); // compute service EPR and keep for later use
-
+
Parameter destParam =
service.getParameter(JMSConstants.PARAM_DESTINATION);
if (destParam != null) {
endpoint.setJndiDestinationName((String)destParam.getValue());
@@ -135,7 +134,7 @@
Parameter destTypeParam =
service.getParameter(JMSConstants.PARAM_DEST_TYPE);
if (destTypeParam != null) {
String paramValue = (String) destTypeParam.getValue();
- if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
+ if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
endpoint.setDestinationType(paramValue);
} else {
@@ -156,6 +155,8 @@
} else {
endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam));
}
+
+ endpoint.computeEPRs(); // compute service EPR and keep for later use
serviceNameToEndpointMap.put(service.getName(), endpoint);
ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf,
service, workerPool);
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
Sun Dec 7 20:48:21 2008
@@ -230,27 +230,33 @@
* Close non-shared producer, session and connection if any
*/
public void close() {
- if (cacheLevel < JMSConstants.CACHE_PRODUCER) {
+ if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) {
try {
producer.close();
} catch (JMSException e) {
log.error("Error closing JMS MessageProducer after send", e);
+ } finally {
+ producer = null;
}
}
- if (cacheLevel < JMSConstants.CACHE_SESSION) {
+ if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) {
try {
session.close();
} catch (JMSException e) {
log.error("Error closing JMS Session after send", e);
+ } finally {
+ session = null;
}
}
- if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
try {
connection.close();
} catch (JMSException e) {
log.error("Error closing JMS Connection after send", e);
+ } finally {
+ connection = null;
}
}
}
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
Sun Dec 7 20:48:21 2008
@@ -168,8 +168,12 @@
try {
return JMSUtils.lookup(context, Destination.class,
destinationName);
} catch (NameNotFoundException e) {
- if (log.isDebugEnabled()) {
- log.debug("Cannot locate destination : " + destinationName + "
using " + url);
+ try {
+ return JMSUtils.lookup(context, Destination.class,
+
(JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ?
+ "dynamicTopics/" : "dynamicQueues/") +
destinationName);
+ } catch (NamingException x) {
+ handleException("Cannot locate destination : " +
destinationName + " using " + url);
}
} catch (NamingException e) {
handleException("Cannot locate destination : " + destinationName +
" using " + url, e);
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
Sun Dec 7 20:48:21 2008
@@ -72,9 +72,9 @@
*/
private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo
trpInfo) {
Map<String,String> props = trpInfo.getProperties();
- if(trpInfo.getProperties() != null) {
+ if (trpInfo.getProperties() != null) {
String jmsConnectionFactoryName =
props.get(JMSConstants.PARAM_JMS_CONFAC);
- if(jmsConnectionFactoryName != null) {
+ if (jmsConnectionFactoryName != null) {
return
connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
} else {
return connFacManager.getJMSConnectionFactory(props);
@@ -134,7 +134,11 @@
// need to synchronize as Sessions are not thread safe
synchronized (messageSender.getSession()) {
- sendOverJMS(msgCtx, messageSender, contentTypeProperty,
jmsConnectionFactory, jmsOut);
+ try {
+ sendOverJMS(msgCtx, messageSender, contentTypeProperty,
jmsConnectionFactory, jmsOut);
+ } finally {
+ messageSender.close();
+ }
}
}
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
Sun Dec 7 20:48:21 2008
@@ -80,7 +80,8 @@
* Get the EPR for the given JMS connection factory and destination
* the form of the URL is
* jms:/<destination>?[<key>=<value>&]*
- * Credentials Context.SECURITY_PRINCIPAL and Context.SECURITY_CREDENTIALS
are filtered
+ * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS
+ * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are
filtered
*
* @param cf the Axis2 JMS connection factory
* @param destinationType the type of destination
@@ -110,7 +111,9 @@
for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) {
if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) &&
-
!Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey())) {
+ !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey())
&&
+
!JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) &&
+
!JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) {
sb.append("&").append(
entry.getKey()).append("=").append(entry.getValue());
}
@@ -584,12 +587,16 @@
ServiceTaskManager stm = new ServiceTaskManager();
stm.setServiceName(name);
- stm.setJndiProperties(jcf.getParameters());
+ stm.addJmsProperties(cf);
+ stm.addJmsProperties(svc);
stm.setConnFactoryJNDIName(
getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc,
cf));
- stm.setDestinationJNDIName(
- getRqdStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf));
+ String destName =
getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf);
+ if (destName == null) {
+ destName = service.getName();
+ }
+ stm.setDestinationJNDIName(destName);
stm.setDestinationType(getDestinationType(svc, cf));
stm.setJmsSpec11(
@@ -624,7 +631,7 @@
if (value != null) {
stm.setConcurrentConsumers(value);
}
- value =
getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf);
+ value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc,
cf);
if (value != null) {
stm.setMaxConcurrentConsumers(value);
}
@@ -651,6 +658,29 @@
}
stm.setWorkerPool(workerPool);
+
+ // remove processed properties from property bag
+ stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER);
+ stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY);
+ stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN);
+ stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED);
+ stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR);
+ stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE);
+ stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL);
+ stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL);
+ stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT);
+ stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS);
+ stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS);
+ stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT);
+ stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR);
+
return stm;
}
@@ -990,9 +1020,9 @@
jmsOut.loadConnectionFactoryFromProperies();
// create a one time connection and session to be used
- Hashtable<String,String> jndiProps = jmsOut.getProperties();
- String user = jndiProps != null ?
jndiProps.get(Context.SECURITY_PRINCIPAL) : null;
- String pass = jndiProps != null ?
jndiProps.get(Context.SECURITY_CREDENTIALS) : null;
+ Hashtable<String,String> jmsProps = jmsOut.getProperties();
+ String user = jmsProps != null ?
jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null;
+ String pass = jmsProps != null ?
jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null;
QueueConnectionFactory qConFac = null;
TopicConnectionFactory tConFac = null;
@@ -1041,8 +1071,9 @@
}
return new JMSMessageSender(connection, session, producer,
- destination, JMSConstants.CACHE_NONE, false,
- destType == -1 ? null : destType == JMSConstants.QUEUE ?
Boolean.TRUE : Boolean.FALSE);
+ destination, (jmsOut.getJmsConnectionFactory() == null ?
+ JMSConstants.CACHE_NONE :
jmsOut.getJmsConnectionFactory().getCacheLevel()), false,
+ destType == -1 ? null : destType == JMSConstants.QUEUE ?
Boolean.TRUE : Boolean.FALSE);
}
/**
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
Sun Dec 7 20:48:21 2008
@@ -32,10 +32,7 @@
import javax.transaction.UserTransaction;
import javax.transaction.NotSupportedException;
import javax.transaction.SystemException;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
+import java.util.*;
/**
* Each service will have one ServiceTaskManager instance that will create,
manage and also destroy
@@ -116,8 +113,8 @@
/** Upper limit on reconnection attempt duration */
private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour
- /** The JNDI context properties */
- private Hashtable<String,String> jndiProperties = null;
+ /** The JNDI context properties and other general properties */
+ private Hashtable<String,String> jmsProperties = new Hashtable<String,
String>();
/** The JNDI Context acuired */
private Context context = null;
/** The ConnectionFactory to be used */
@@ -303,6 +300,20 @@
}
/**
+ * Get the number of MessageListenerTasks that are currently connected to
the JMS provider
+ * @return connected task count
+ */
+ private int getConnectedTaskCount() {
+ int count = 0;
+ for (MessageListenerTask lstTask : pollingTasks) {
+ if (lstTask.isConnected()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
* The actual threads/tasks that perform message polling
*/
private class MessageListenerTask implements Runnable, ExceptionListener {
@@ -319,6 +330,8 @@
private int idleExecutionCount = 0;
/** Is this task idle right now? */
private volatile boolean idle = false;
+ /** Is this task connected to the JMS provider successfully? */
+ private boolean connected = false;
/** As soon as we create a new polling task, add it to the STM for
control later */
MessageListenerTask() {
@@ -369,49 +382,58 @@
log.debug("New poll task starting : thread id = " +
Thread.currentThread().getId());
}
- while (isActive() &&
- (getMaxMessagesPerTask() < 0 || messageCount <
getMaxMessagesPerTask()) &&
- (getConcurrentConsumers() == 1 || idleExecutionCount <
getIdleTaskExecutionLimit())) {
+ try {
+ while (isActive() &&
+ (getMaxMessagesPerTask() < 0 || messageCount <
getMaxMessagesPerTask()) &&
+ (getConcurrentConsumers() == 1 || idleExecutionCount <
getIdleTaskExecutionLimit())) {
- UserTransaction ut = null;
- try {
- if (transactionality == BaseConstants.TRANSACTION_JTA) {
- ut = getUserTransaction();
- ut.begin();
+ UserTransaction ut = null;
+ try {
+ if (transactionality == BaseConstants.TRANSACTION_JTA)
{
+ ut = getUserTransaction();
+ ut.begin();
+ }
+ } catch (NotSupportedException e) {
+ handleException("Listener Task is already associated
with a transaction", e);
+ } catch (SystemException e) {
+ handleException("Error starting a JTA transaction", e);
}
- } catch (NotSupportedException e) {
- handleException("Listener Task is already associated with
a transaction", e);
- } catch (SystemException e) {
- handleException("Error starting a JTA transaction", e);
- }
- // Get a message by polling, or receive null
- Message message = receiveMessage();
+ // Get a message by polling, or receive null
+ Message message = receiveMessage();
+
+ if (log.isTraceEnabled()) {
+ if (message != null) {
+ try {
+ log.trace("<<<<<<< READ message with Message
ID : " +
+ message.getJMSMessageID() + " from : " +
destination +
+ " by Thread ID : " +
Thread.currentThread().getId());
+ } catch (JMSException ignore) {}
+ } else {
+ log.trace("No message received by Thread ID : " +
+ Thread.currentThread().getId() + " for
destination : " + destination);
+ }
+ }
- if (log.isTraceEnabled()) {
if (message != null) {
- try {
- log.trace("<<<<<<< READ message with Message ID :
" +
- message.getJMSMessageID() + " from : " +
destination +
- " by Thread ID : " +
Thread.currentThread().getId());
- } catch (JMSException ignore) {}
+ idle = false;
+ idleExecutionCount = 0;
+ messageCount++;
+ // I will be busy now while processing this message,
so start another if needed
+ scheduleNewTaskIfAppropriate();
+ handleMessage(message, ut);
+
} else {
- log.trace("No message received by Thread ID : " +
- Thread.currentThread().getId() + " for destination
: " + destination);
+ idle = true;
+ idleExecutionCount++;
}
}
- if (message != null) {
- idle = false;
- idleExecutionCount = 0;
- messageCount++;
- // I will be busy now while processing this message, so
start another if needed
- scheduleNewTaskIfAppropriate();
- handleMessage(message, ut);
-
- } else {
- idle = true;
- idleExecutionCount++;
+ } finally {
+ workerState = STATE_STOPPED;
+ activeTaskCount--;
+ synchronized(pollingTasks) {
+ pollingTasks.remove(this);
}
}
@@ -431,10 +453,6 @@
closeSession(true);
closeConnection();
- activeTaskCount--;
- synchronized(pollingTasks) {
- pollingTasks.remove(this);
- }
// My time is up, so if I am going away, create another
scheduleNewTaskIfAppropriate();
}
@@ -561,9 +579,13 @@
public void onException(JMSException j) {
if (!isSTMActive()) {
+ requestShutdown();
return;
}
+ log.warn("JMS Connection failure : " + j.getMessage());
+ setConnected(false);
+
if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
// failed Connection was not shared, thus no need to restart
the whole STM
requestShutdown();
@@ -572,7 +594,7 @@
// if we failed while active, update state to show failure
setServiceTaskManagerState(STATE_FAILURE);
- log.error("JMS Connection failed : " + j.getMessage() + " -
shutting down worker tasks", j);
+ log.error("JMS Connection failed : " + j.getMessage() + " -
shutting down worker tasks");
int r = 1;
long retryDuration = initialReconnectDuration;
@@ -581,9 +603,22 @@
try {
log.info("Reconnection attempt : " + r + " for service : "
+ serviceName);
start();
- } catch (Exception e) {
+ } catch (Exception ignore) {}
+
+ boolean connected = false;
+ for (int i=0; i<5; i++) {
+ if (getConnectedTaskCount() == concurrentConsumers) {
+ connected = true;
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {}
+ }
+
+ if (!connected) {
log.error("Reconnection attempt : " + (r++) + " for
service : " + serviceName +
- " failed. Next retry in " + (retryDuration/1000) +
"seconds", e);
+ " failed. Next retry in " + (retryDuration/1000) +
"seconds");
retryDuration = (long) (retryDuration *
reconnectionProgressionFactor);
if (retryDuration > maxReconnectDuration) {
retryDuration = maxReconnectDuration;
@@ -593,7 +628,8 @@
Thread.sleep(retryDuration);
} catch (InterruptedException ignore) {}
}
- } while (!isSTMActive());
+
+ } while (!isSTMActive() || getConnectedTaskCount() <
concurrentConsumers);
}
protected void requestShutdown() {
@@ -608,6 +644,14 @@
return idle;
}
+ public boolean isConnected() {
+ return connected;
+ }
+
+ public void setConnected(boolean connected) {
+ this.connected = connected;
+ }
+
/**
* Get a Connection that could/should be used by this task - depends
on the cache level to reuse
* @return the shared Connection if cache level is higher than
CACHE_NONE, or a new Connection
@@ -630,6 +674,7 @@
}
}
}
+ setConnected(true);
return connection;
}
@@ -733,15 +778,15 @@
log.info("Connected to the JMS connection factory : " +
getConnFactoryJNDIName());
} catch (NamingException e) {
handleException("Error looking up connection factory : " +
getConnFactoryJNDIName() +
- " using JNDI properties : " + jndiProperties, e);
+ " using JNDI properties : " + jmsProperties, e);
}
Connection connection = null;
try {
connection = JMSUtils.createConnection(
conFactory,
- jndiProperties.get(Context.SECURITY_PRINCIPAL),
- jndiProperties.get(Context.SECURITY_CREDENTIALS),
+ jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME),
+ jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD),
isJmsSpec11(), isQueue());
connection.setExceptionListener(this);
@@ -750,7 +795,7 @@
} catch (JMSException e) {
handleException("Error acquiring a JMS connection to : " +
getConnFactoryJNDIName() +
- " using JNDI properties : " + jndiProperties, e);
+ " using JNDI properties : " + jmsProperties, e);
}
return connection;
}
@@ -786,7 +831,7 @@
}
return JMSUtils.createConsumer(
- session, getDestination(), isQueue(),
+ session, getDestination(session), isQueue(),
(isSubscriptionDurable() && getDurableSubscriberName() ==
null ?
getDurableSubscriberName() : serviceName),
getMessageSelector(), isPubSubNoLocal(),
isSubscriptionDurable(), isJmsSpec11());
@@ -806,7 +851,7 @@
*/
private Context getInitialContext() throws NamingException {
if (context == null) {
- context = new InitialContext(jndiProperties);
+ context = new InitialContext(jmsProperties);
}
return context;
}
@@ -815,7 +860,7 @@
* Return the JMS Destination for the JNDI name of the Destination from
the InitialContext
* @return the JMS Destination to which this STM listens for messages
*/
- private Destination getDestination() {
+ private Destination getDestination(Session session) {
if (destination == null) {
try {
context = getInitialContext();
@@ -825,8 +870,26 @@
" found for service " + serviceName);
}
} catch (NamingException e) {
- handleException("Error looking up JMS destination : " +
getDestinationJNDIName() +
- " using JNDI properties : " + jndiProperties, e);
+ try {
+ switch (destinationType) {
+ case JMSConstants.QUEUE: {
+ destination =
session.createQueue(getDestinationJNDIName());
+ break;
+ }
+ case JMSConstants.TOPIC: {
+ destination =
session.createTopic(getDestinationJNDIName());
+ break;
+ }
+ default: {
+ handleException("Error looking up JMS destination
: " +
+ getDestinationJNDIName() + " using JNDI
properties : " +
+ jmsProperties, e);
+ }
+ }
+ } catch (JMSException j) {
+ handleException("Error looking up and creating JMS
destination : " +
+ getDestinationJNDIName() + " using JNDI properties : "
+ jmsProperties, e);
+ }
}
}
return destination;
@@ -848,7 +911,7 @@
JMSUtils.lookup(context, UserTransaction.class,
getUserTransactionJNDIName());
} catch (NamingException e) {
handleException("Error looking up UserTransaction : " +
getDestinationJNDIName() +
- " using JNDI properties : " + jndiProperties, e);
+ " using JNDI properties : " + jmsProperties, e);
}
}
@@ -862,7 +925,7 @@
}
} catch (NamingException e) {
handleException("Error looking up UserTransaction : " +
getDestinationJNDIName() +
- " using JNDI properties : " + jndiProperties, e);
+ " using JNDI properties : " + jmsProperties, e);
}
}
return sharedUserTransaction;
@@ -1100,12 +1163,16 @@
this.jmsSpec11 = jmsSpec11;
}
- public Hashtable<String, String> getJndiProperties() {
- return jndiProperties;
+ public Hashtable<String, String> getJmsProperties() {
+ return jmsProperties;
+ }
+
+ public void addJmsProperties(Map<String, String> jmsProperties) {
+ this.jmsProperties.putAll(jmsProperties);
}
- public void setJndiProperties(Hashtable<String, String> jndiProperties) {
- this.jndiProperties = jndiProperties;
+ public void removeJmsProperties(String key) {
+ this.jmsProperties.remove(key);
}
public Context getContext() {
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
Sun Dec 7 20:48:21 2008
@@ -97,6 +97,9 @@
transport.UserTxnJNDIName
transport.CacheUserTxn - true | false
+transport.jms.UserName - user name to use when creating a new JMS Connection
+transport.jms.Password - password to use when creating a new JMS Connection
+
transport.jms.PublishEPR - one or more EPR's could be specified. If none
specified, defaults to