Author: veithen
Date: Sat May 15 15:41:30 2010
New Revision: 944668
URL: http://svn.apache.org/viewvc?rev=944668&view=rev
Log:
Attempt to fix the random build failure seen on Hudson. It appears that it
sometimes happens that a test sends a message before the JMS transport is
listening on the destination. This works fine for queues, but not for topics.
Slightly modified the JMS transport to track the number of JMS message
consumers and to wait for at least one to be ready.
Modified:
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
Modified:
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
URL:
http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java?rev=944668&r1=944667&r2=944668&view=diff
==============================================================================
---
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
(original)
+++
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
Sat May 15 15:41:30 2010
@@ -87,7 +87,11 @@ public class JMSListener extends Abstrac
stm.start();
for (int i=0; i<3; i++) {
- if (stm.getActiveTaskCount() > 0) {
+ // Check the consumer count rather than the active task count.
Reason: if the
+ // destination is of type topic, then the transport is only ready
to receive
+ // messages if at least one consumer exists. This is of not much
importance,
+ // except for automated tests.
+ if (stm.getConsumerCount() > 0) {
log.info("Started to listen on destination : " +
stm.getDestinationJNDIName() +
" of type " +
JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
" for service " + stm.getServiceName());
Modified:
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL:
http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=944668&r1=944667&r2=944668&view=diff
==============================================================================
---
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
(original)
+++
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
Sat May 15 15:41:30 2010
@@ -34,6 +34,7 @@ import javax.transaction.NotSupportedExc
import javax.transaction.SystemException;
import javax.transaction.Status;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Each service will have one ServiceTaskManager instance that will create,
manage and also destroy
@@ -135,6 +136,8 @@ public class ServiceTaskManager {
private volatile int serviceTaskManagerState = STATE_STOPPED;
/** Number of invoker tasks active */
private volatile int activeTaskCount = 0;
+ /** The number of existing JMS message consumers. */
+ private final AtomicInteger consumerCount = new AtomicInteger();
/** The shared thread pool from the Listener */
private WorkerPool workerPool = null;
@@ -763,6 +766,7 @@ public class ServiceTaskManager {
if (log.isDebugEnabled()) {
log.debug("Closing non-shared JMS consumer for service
: " + serviceName);
}
+ consumerCount.decrementAndGet();
consumer.close();
} catch (JMSException e) {
logError("Error closing JMS consumer", e);
@@ -836,11 +840,13 @@ public class ServiceTaskManager {
log.debug("Creating a new JMS MessageConsumer for service
: " + serviceName);
}
- return JMSUtils.createConsumer(
+ MessageConsumer consumer = JMSUtils.createConsumer(
session, getDestination(session), isQueue(),
(isSubscriptionDurable() && getDurableSubscriberName() ==
null ?
getDurableSubscriberName() : serviceName),
getMessageSelector(), isPubSubNoLocal(),
isSubscriptionDurable(), isJmsSpec11());
+ consumerCount.incrementAndGet();
+ return consumer;
} catch (JMSException e) {
handleException("Error creating JMS consumer for service : " +
serviceName,e);
@@ -1207,7 +1213,16 @@ public class ServiceTaskManager {
public int getActiveTaskCount() {
return activeTaskCount;
}
-
+
+ /**
+ * Get the number of existing JMS message consumers.
+ *
+ * @return the number of consumers
+ */
+ public int getConsumerCount() {
+ return consumerCount.get();
+ }
+
public void setServiceTaskManagerState(int serviceTaskManagerState) {
this.serviceTaskManagerState = serviceTaskManagerState;
}