Hello, - Given a network of V5.5.1 brokers named "brokerA" and "brokerB", and - application "serviceA" is connected to "brokerA" and subscribed to a "request" topic and ready to produce on a "response" topic, and - client application "clientB" is connected to "brokerB", and - named client is first creating a temporary consumer subscription against the "response" topic on "brokerB", then producing a request message for the "request" topic, and - "serviceA" received the request message, produced the response message in the response topic on "brokerA", - then the message is not always received by the temporary consumer on broker B (receive times out).
This happens often enough to make it an annoyance in our production environment. We have proven that the service responds, but can't understand why the subscriber starves. I currently have the theory that when serviceA is responding on brokerA, brokerA is not yet aware of the new consumer/subscriber on the response topic (maybe the network of brokers advisory channels haven't been processed fast enough) and without subscribers the message goes to Nirvana. Can this theoretically be the case? We use Spring JmsTemplate on both ends of the topics and the following class construction to do the synchronous request/response: import java.io.Serializable; import java.util.UUID; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.SessionCallback; import org.springframework.jms.support.JmsUtils; import org.springframework.stereotype.Component; /* * based on http://codedependents.com/2010/03/04/synchronous-request-response-with-activemq-and-spring/ */ @Component public class SynchronousJmsRequestor { private static final int TIMEOUT = 5000; private final Destination requestDestination; private final Destination responseDestination; private long timeout = TIMEOUT; private final String clientUid; @Resource private JmsTemplate jmsTemplate; public SynchronousJmsRequestor( final Destination requestDestination, final Destination responseDestination, final String clientUid) { this.responseDestination = responseDestination; this.requestDestination = requestDestination; this.clientUid = clientUid; } public Serializable request(final String serverUid, final Serializable payload) throws JMSException { JmsDelegate jmsDelegate = new JmsDelegate(payload, requestDestination, responseDestination, serverUid, clientUid, timeout); ObjectMessage response = (ObjectMessage) jmsTemplate.execute(jmsDelegate, true); return response == null ? null : response.getObject(); } public long getTimeout() { return timeout; } public void setTimeout(long timeout) { this.timeout = timeout; } } class JmsDelegate implements SessionCallback<Message> { private Serializable payload; private final Destination requestDestination; private final Destination responseDestination; private String serverUid; private final String clientUid; private long timeout; public JmsDelegate(Serializable payload, Destination requestDestination, Destination responseDestination, String serverUid, String clientUid, long timeout) { super(); this.payload = payload; this.requestDestination = requestDestination; this.responseDestination = responseDestination; this.serverUid = serverUid; this.clientUid = clientUid; this.timeout = timeout; } @Override public Message doInJms(final Session session) throws JMSException { MessageConsumer consumer = null; MessageProducer producer = null; try { final String correlationId = UUID.randomUUID().toString(); * consumer = session.createConsumer(responseDestination, "JMSCorrelationID = '" + correlationId + "'");* final ObjectMessage message = session.createObjectMessage(payload); message.setJMSCorrelationID(correlationId); message.setStringProperty("CLIENT_ID", clientUid); message.setStringProperty("SERVER_ID", serverUid); producer = session.createProducer(requestDestination); *producer.send(message);* *return consumer.receive(timeout);* } finally { // Don't forget to close your resources JmsUtils.closeMessageConsumer(consumer); JmsUtils.closeMessageProducer(producer); } } } Cheers, Kai