The client, which creates a temporary queue, registers it's name to jboss cache and listening for messages:
| package hu.molaris.client; | | import java.util.Properties; | | import javax.jms.Connection; | import javax.jms.ConnectionFactory; | import javax.jms.Message; | import javax.jms.MessageConsumer; | import javax.jms.MessageListener; | import javax.jms.Queue; | import javax.jms.Session; | import javax.jms.TextMessage; | import javax.naming.InitialContext; | | public class Client implements MessageListener { | private static final String USER_NAME = "testUser"; | | public static void main(String[] args) throws Exception { | new Client().run(); | } | | @Override | public void onMessage(Message message) { | try { | String receivedText = null; | TextMessage receivedMessage = (TextMessage) message; | receivedText = receivedMessage.getText(); | | System.out.println("Received message: " + receivedText); | } catch (Exception e) { | e.printStackTrace(); | } | } | | public void run() { | Connection connection = null; | Session session = null; | Queue temporaryQueue; | InitialContext ic; | | try { | Properties props = new Properties(); | props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); | props.put("java.naming.provider.url", "avl-32-2:1200,avl-32-3:1200"); | props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); | | ic = new InitialContext(props); | ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ClusteredConnectionFactory"); | | connection = cf.createConnection(); | session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | | temporaryQueue = session.createTemporaryQueue(); | MessageConsumer consumer = session.createConsumer(temporaryQueue); | | /* | * Registering temporary queue name into JBoss Cache | */ | ICommunicationService commService = (ICommunicationService) ic.lookup("CommunicationSession/remote"); | commService.registerClient(Client.USER_NAME, temporaryQueue.getQueueName()); | | connection.start(); | consumer.setMessageListener(this); | | System.out.println("Waiting for messages..."); | for (int i = 0; i < 100; i++) { | Thread.sleep(5000); | System.out.println("tick."); | } | } catch (Exception e) { | e.printStackTrace(); | } finally { | try { | session.close(); | connection.close(); | } catch (Exception e) { | e.printStackTrace(); | } | } | System.out.println("Bye"); | } | | } | The MDB which receives messages and forwards them to the temp queue: | package hu.molaris.mdb; | | import javax.ejb.ActivationConfigProperty; | import javax.ejb.MessageDriven; | import javax.jms.Connection; | import javax.jms.ConnectionFactory; | import javax.jms.JMSException; | import javax.jms.Message; | import javax.jms.MessageListener; | import javax.jms.MessageProducer; | import javax.jms.Queue; | import javax.jms.Session; | import javax.jms.TextMessage; | import javax.naming.InitialContext; | | @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), | @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/CommunicationQueue") }) | public class MessageReceiverMDB implements MessageListener { | | private static final String USER_NAME = "testUser"; | | /** | * @see MessageListener#onMessage(Message) | */ | public void onMessage(Message message) { | Connection conn = null; | Session session = null; | try { | TextMessage tm = (TextMessage) message; | | String text = tm.getText(); | System.out.println("message " + text + " received"); | | InitialContext ic = new InitialContext(); | ConnectionFactory cf = (ConnectionFactory) ic.lookup("java:/JmsXA"); | | conn = cf.createConnection(); | conn.start(); | session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); | | /* | * Reading temporary queue name from JBoss Cache | */ | CacheDataHandler cacheDataHandler = ComServCacheDataHandler.getInstance(); | String queue = (String) cacheDataHandler.getData(MessageReceiverMDB.USER_NAME); | | Queue forwardTo = session.createQueue(queue); | MessageProducer producer = session.createProducer(forwardTo); | TextMessage forward = session.createTextMessage("Message from device: " + text); | | producer.send(forward); | producer.close(); | | ic.close(); | } catch (Exception e) { | e.printStackTrace(); | System.out.println("The Message Driven Bean failed!"); | } finally { | try { | session.close(); | conn.close(); | } catch (JMSException e) { | e.printStackTrace(); | } | } | } | } | The server, which represents the original sender of the messages: | package hu.molaris.sender; | | import java.util.Properties; | | import javax.jms.Connection; | import javax.jms.ConnectionFactory; | import javax.jms.JMSException; | import javax.jms.MessageProducer; | import javax.jms.Queue; | import javax.jms.Session; | import javax.jms.TextMessage; | import javax.naming.InitialContext; | | public class Sender { | private static final String QUEUE_NAME = "queue/CommunicationQueue"; | | public static void main(String[] args) { | for (int i = 0; i < 10; i++) { | new Sender().run(); | } | } | | public void run() { | Connection connection = null; | Session session = null; | MessageProducer sender; | InitialContext ic; | | try { | Properties props = new Properties(); | props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); | props.put("java.naming.provider.url", "avl-32-2:1200,avl-32-3:1200"); | props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); | | ic = new InitialContext(props); | | ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ClusteredConnectionFactory"); | Queue queue = (Queue) ic.lookup(Sender.QUEUE_NAME); | System.out.println("Queue " + Sender.QUEUE_NAME + " exists"); | | connection = cf.createConnection(); | session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | sender = session.createProducer(queue); | | TextMessage message = session.createTextMessage("test message"); | | connection.start(); | sender.send(message); | | System.out.println("The " + message.getText() + " message was successfully sent to the " + queue.getQueueName() + " queue"); | } catch (Exception e) { | e.printStackTrace(); | } finally { | try { | session.close(); | connection.close(); | } catch (JMSException e) { | e.printStackTrace(); | } | } | } | } | If the cluster consist of 1 node, there is no problem. But if it contains 2 or more, I get the error. Thanks! View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4226946#4226946 Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4226946 _______________________________________________ jboss-user mailing list jboss-user@lists.jboss.org https://lists.jboss.org/mailman/listinfo/jboss-user