Author: asankha
Date: Sun Nov 30 09:47:29 2008
New Revision: 721864
URL: http://svn.apache.org/viewvc?rev=721864&view=rev
Log:
Share connection between all tasks of a STM when connection or above is shared
(http://markmail.org/message/j2f5xdrtfeuoup7f)
Use two variables to store state of STM and its individual tasks
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=721864&r1=721863&r2=721864&view=diff
==============================================================================
---
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
(original)
+++
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
Sun Nov 30 09:47:29 2008
@@ -132,12 +132,15 @@
private JMSMessageReceiver jmsMessageReceiver = null;
/** State of this Task Manager */
- private volatile int state = STATE_STOPPED;
+ private volatile int serviceTaskManagerState = STATE_STOPPED;
/** Number of invoker tasks active */
private volatile int activeTaskCount = 0;
/** The shared thread pool from the Listener */
private WorkerPool workerPool = null;
+ /** The JMS Connection shared between multiple polling tasks - when
enabled (reccomended) */
+ private Connection sharedConnection = null;
+
/**
* Start or re-start the Task Manager by shutting down any existing worker
tasks and
* re-creating them. However, if this is STM is PAUSED, a start request is
ignored.
@@ -146,13 +149,13 @@
*/
public synchronized void start() {
- if (state == STATE_PAUSED) {
+ if (serviceTaskManagerState == STATE_PAUSED) {
log.info("Attempt to re-start paused TaskManager is ignored.
Please use resume instead");
return;
}
// if any tasks are running, stop whats running now
- if (pollingTasks.isEmpty()) {
+ if (!pollingTasks.isEmpty()) {
stop();
}
@@ -167,7 +170,7 @@
"worker tasks of service : " + serviceName);
break;
case JMSConstants.CACHE_CONNECTION:
- log.debug("Only the JMS Connection will be cached and shared
between successive " +
+ log.debug("Only the JMS Connection will be cached and shared
between *all* " +
"poller task invocations");
break;
case JMSConstants.CACHE_SESSION:
@@ -188,7 +191,7 @@
workerPool.execute(new MessageListenerTask());
}
- state = STATE_STARTED;
+ serviceTaskManagerState = STATE_STARTED;
log.info("Task manager for service : " + serviceName + "
[re-]initialized");
}
@@ -201,8 +204,8 @@
log.debug("Stopping ServiceTaskManager for service : " +
serviceName);
}
- if (state != STATE_FAILURE) {
- state = STATE_SHUTTING_DOWN;
+ if (serviceTaskManagerState != STATE_FAILURE) {
+ serviceTaskManagerState = STATE_SHUTTING_DOWN;
}
synchronized(pollingTasks) {
@@ -221,12 +224,22 @@
} catch (InterruptedException ignore) {}
}
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.stop();
+ } catch (JMSException e) {
+ logError("Error stopping shared Connection", e);
+ } finally {
+ sharedConnection = null;
+ }
+ }
+
if (activeTaskCount > 0) {
log.warn("Unable to shutdown all polling tasks of service : " +
serviceName);
}
- if (state != STATE_FAILURE) {
- state = STATE_STOPPED;
+ if (serviceTaskManagerState != STATE_FAILURE) {
+ serviceTaskManagerState = STATE_STOPPED;
}
log.info("Task manager for service : " + serviceName + " shutdown");
}
@@ -235,19 +248,33 @@
* Temporarily suspend receipt and processing of messages. Accomplished by
stopping the
* connection / or connections used by the poller tasks
*/
- public void pause() {
+ public synchronized void pause() {
for (MessageListenerTask lstTask : pollingTasks) {
lstTask.pause();
}
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.stop();
+ } catch (JMSException e) {
+ logError("Error pausing shared Connection", e);
+ }
+ }
}
/**
* Resume receipt and processing of messages of paused tasks
*/
- public void resume() {
+ public synchronized void resume() {
for (MessageListenerTask lstTask : pollingTasks) {
lstTask.resume();
}
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.start();
+ } catch (JMSException e) {
+ logError("Error resuming shared Connection", e);
+ }
+ }
}
/**
@@ -255,7 +282,7 @@
* e do not have any idle tasks - i.e. scale up listening
*/
private void scheduleNewTaskIfAppropriate() {
- if (state == STATE_STARTED &&
+ if (serviceTaskManagerState == STATE_STARTED &&
pollingTasks.size() < getMaxConcurrentConsumers() &&
getIdleTaskCount() == 0) {
workerPool.execute(new MessageListenerTask());
}
@@ -287,7 +314,7 @@
/** The MessageConsumer used by the polling task */
private MessageConsumer consumer = null;
/** State of the worker polling task */
- private volatile int state = STATE_STOPPED;
+ private volatile int workerState = STATE_STOPPED;
/** The number of idle (i.e. without fetching a message) polls for
this task */
private int idleExecutionCount = 0;
/** Is this task idle right now? */
@@ -305,14 +332,14 @@
*/
public void pause() {
if (isActive()) {
- if (connection != null) {
+ if (connection != null && cacheLevel <
JMSConstants.CACHE_CONNECTION) {
try {
connection.stop();
} catch (JMSException e) {
log.warn("Error pausing Message Listener task for
service : " + serviceName);
}
}
- state = STATE_PAUSED;
+ workerState = STATE_PAUSED;
}
}
@@ -320,21 +347,21 @@
* Resume this polling task
*/
public void resume() {
- if (connection != null) {
+ if (connection != null && cacheLevel <
JMSConstants.CACHE_CONNECTION) {
try {
connection.start();
} catch (JMSException e) {
log.warn("Error resuming Message Listener task for service
: " + serviceName);
}
}
- state = STATE_STARTED;
+ workerState = STATE_STARTED;
}
/**
* Execute the polling worker task
*/
public void run() {
- state = STATE_STARTED;
+ workerState = STATE_STARTED;
activeTaskCount++;
int messageCount = 0;
@@ -402,7 +429,7 @@
closeConsumer(true);
closeSession(true);
- closeConnection(true);
+ closeConnection();
activeTaskCount--;
synchronized(pollingTasks) {
@@ -524,7 +551,7 @@
}
closeSession(false);
- closeConnection(false);
+ closeConnection();
}
}
@@ -537,8 +564,14 @@
return;
}
+ if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ // failed Connection was not shared, thus no need to restart
the whole STM
+ requestShutdown();
+ return;
+ }
+
// if we failed while active, update state to show failure
- setState(STATE_FAILURE);
+ setServiceTaskManagerState(STATE_FAILURE);
log.error("JMS Connection failed : " + j.getMessage() + " -
shutting down worker tasks", j);
int r = 1;
@@ -564,11 +597,11 @@
}
protected void requestShutdown() {
- state = STATE_SHUTTING_DOWN;
+ workerState = STATE_SHUTTING_DOWN;
}
private boolean isActive() {
- return state == STATE_STARTED;
+ return workerState == STATE_STARTED;
}
protected boolean isTaskIdle() {
@@ -580,8 +613,22 @@
* @return the shared Connection if cache level is higher than
CACHE_NONE, or a new Connection
*/
private Connection getConnection() {
- if (connection == null || cacheLevel <
JMSConstants.CACHE_CONNECTION) {
- connection = createConnection();
+ if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ // Connection is not shared
+ if (connection == null) {
+ connection = createConnection();
+ }
+ } else {
+ if (sharedConnection != null) {
+ connection = sharedConnection;
+ } else {
+ synchronized(this) {
+ if (sharedConnection == null) {
+ sharedConnection = createConnection();
+ }
+ connection = sharedConnection;
+ }
+ }
}
return connection;
}
@@ -618,9 +665,9 @@
* Close the given Connection, hiding exceptions if any which are
logged
* @param connection the Connection to be closed
*/
- private void closeConnection(boolean forced) {
+ private void closeConnection() {
if (connection != null &&
- (cacheLevel < JMSConstants.CACHE_CONNECTION || forced)) {
+ cacheLevel < JMSConstants.CACHE_CONNECTION) {
try {
if (log.isDebugEnabled()) {
log.debug("Closing non-shared JMS connection for
service : " + serviceName);
@@ -823,7 +870,7 @@
// -------------------- trivial methods ---------------------
private boolean isSTMActive() {
- return state == STATE_STARTED;
+ return serviceTaskManagerState == STATE_STARTED;
}
/**
@@ -1085,41 +1132,7 @@
return activeTaskCount;
}
- public void setState(int state) {
- this.state = state;
+ public void setServiceTaskManagerState(int serviceTaskManagerState) {
+ this.serviceTaskManagerState = serviceTaskManagerState;
}
-
- //--------------------- used for development
testing---------------------------
- /*public static void main(String[] args) throws Exception {
- //org.apache.log4j.BasicConfigurator.configure();
- new ServiceTaskManager().testSTM();
- }
-
- private void testSTM() throws Exception {
- ServiceTaskManager stm = new ServiceTaskManager();
- Hashtable<String, String> props = new Hashtable<String, String>();
- props.put("java.naming.factory.initial",
"weblogic.jndi.WLInitialContextFactory");
- props.put("java.naming.provider.url", "t3://localhost:7001");
- stm.setJndiProperties(props);
- stm.setConnFactoryJNDIName("weblogic.jms.ConnectionFactory");
- stm.setDestinationJNDIName("weblogic.examples.jms.MyQueue");
- stm.setServiceName("test");
- stm.setCacheLevel(JMSConstants.CACHE_CONNECTION);
- stm.setMaxConcurrentConsumers(40);
-
- stm.workerPool = new NativeWorkerPool(20, 40, 5, 100, "JMS-Worker",
"jms");
- stm.start();
- }
-
- public boolean processMessage(Message msg, UserTransaction ut) {
- try {
- if (msg instanceof TextMessage) {
- System.out.println("Received : " + ((TextMessage)
msg).getText());
- }
- return true;
- } catch (JMSException e) {
- e.printStackTrace();
- return false;
- }
- }*/
}