Sorry for posting this late. Upgrading to CR2 of jboss-messaging solved the hang problem. I am posting the working code anyway.
And these variables need to be set in build.properties. jms.test.java.naming.provider.url = jnp://<server name>:1099 jms.test.java.naming.factory.initial = org.jnp.interfaces.NamingContextFactory And the jboss client is a standalone program. Thanks Raghu | import java.util.Date; | | import javax.jms.Connection; | import javax.jms.ConnectionFactory; | import javax.jms.Destination; | import javax.jms.JMSException; | import javax.jms.Message; | import javax.jms.MessageConsumer; | import javax.jms.MessageProducer; | import javax.jms.ObjectMessage; | import javax.jms.Session; | import javax.jms.Topic; | import javax.naming.InitialContext; | | | import junit.framework.TestCase; | | public class TestJMS1 extends TestCase { | | Date date; | | static final String nameTopicConnFactory = "XAConnectionFactory"; | static final String nameTopic = "topic/testTopic"; | | static final int CONSUMER_WAIT_TIME = 60000; | static final int RELAYER_WAIT_TIME = 30000; | | public TestJMS1(String name) { | super(name); | } | | public void setUp() { | } | | public void tearDown() { | } | | public void testJMSFastPublisherSlowConsumerWithRelay() throws Exception { | String testName = "Publish/Subscribe with Slow Consumer and Relayer - AUTO ACK "; | ConnectionFactory connFactory = null; | Connection connection = null; | Session session = null; | Destination topic = null; | MyRelayerWithNewSession relayThread = null; | MyConsumerWithNewSession consumerThread = null; | WatchdogTimer watchdog = null; | | MessageProducer[] producers = new MessageProducer[1]; | | try { | InitialContext ic = new InitialContext (); | //System.out.println ("Created InitialContext :: " + ic); | String payload = "The static portion "; | | connFactory = (ConnectionFactory) ic.lookup (nameTopicConnFactory); | topic = (Topic)ic.lookup(nameTopic); | connection = connFactory.createConnection(); | | session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | producers[0] = session.createProducer(topic); | | relayThread = new MyRelayerWithNewSession(connection, topic, "text0", "text1", | RELAYER_WAIT_TIME, false, payload); | consumerThread = new MyConsumerWithNewSession(connection, topic, "text1", | CONSUMER_WAIT_TIME, false, payload); | | connection.start(); | | // watchdog = new WatchdogTimer(producers, consumers, 90000, connection); | | //System.out.println("Sender starting"); | try { | // For threads to create the sessions. | Thread.sleep(5000); | for(int i=0; i<10000; i++) { | ObjectMessage message = session.createObjectMessage(); | message.setStringProperty("target", "text0"); | message.setObject(payload+i); | producers[0].send(message); | // System.out.println("Sent iter : " + i); | // Thread.sleep(1000); | if (i%100 == 0) { | System.out.println("Sent " + i + "messages"); | } | } | } catch (Throwable t ) { | System.out.println("Producer1 send got Error: "+t.getMessage()); | } | | System.out.println("Done with sending"); | Thread.sleep(200000); | consumerThread.join(); | relayThread.join(); | | } catch (Throwable t) { | System.err.println("Error: "+t.getMessage()); | // t.printStackTrace(System.err); | } finally { | try { | if (connection != null){ | connection.close(); | connection = null; | } | } catch (JMSException e) { | e.printStackTrace(); | } | } | | // if (watchdog.isAlive()) { | // watchdog.interrupt(); | // } | // watchdog.join(); | | date = new Date(); | if (!watchdog.interrupted) { | System.out.println(date.toString()+": "+testName + " : PASSED"); | } else { | System.out.println(date.toString()+": "+testName + " : FAILED"); | } | } | | class MyRelayerWithNewSession extends Thread { | Connection connection = null; | Destination topic = null; | String localTarget = null; | String relayTarget = null; | int delay_ms = 0; | boolean explicit_ack = false; | String payload = null; | | Session consumeSession = null; | Session produceSession = null; | | MessageConsumer consumer = null; | MessageProducer producer = null; | | MyRelayerWithNewSession(Connection connection, Destination topic, String localTarget, | String relayTarget, int delay_ms, boolean explicit_ack, String payload) { | this.connection = connection; | this.topic = topic; | this.localTarget = localTarget; | this.relayTarget = relayTarget; | this.delay_ms = delay_ms; | this.explicit_ack = explicit_ack; | this.payload = payload; | start(); | } | | public void run(){ | | try { | consumeSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | produceSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | | consumer = consumeSession.createConsumer(topic, "target='"+localTarget+"'", false); | producer = produceSession.createProducer(topic); | | Thread.sleep(delay_ms); | System.out.println("Relayer thread waking up"); | for (int i=0; i< 10000; i ++) { | Message receivedMessage = consumer.receive(); | if (explicit_ack) { | receivedMessage.acknowledge(); | } | String receivedPayload = (String) ((ObjectMessage) receivedMessage).getObject(); | // System.out.println("Relay Read at iter "+i+" : " + receivedPayload); | // System.out.println("Relay iter "+i+" : "+receivedPayload.substring(19)); | // assertEquals(true, receivedPayload.equals(payload+i)); | | ObjectMessage message = produceSession.createObjectMessage(); | message.setStringProperty("target", relayTarget); | message.setObject(receivedPayload); | producer.send(message); | | if (i%100 == 0) { | System.out.println("Relayed " + i + "messages"); | } | } | | System.out.println("Done with Relayer thread"); | | } catch (Throwable t) { | System.err.println("Relayer Thread encountered Error: "+t.getMessage()); | // t.printStackTrace(System.err); | } | } | | } | | class MyConsumerWithNewSession extends Thread { | Connection connection = null; | Destination topic = null; | String localTarget = null; | int delay_ms = 0; | boolean explicit_ack = false; | String payload = null; | | Session consumeSession = null; | MessageConsumer consumer = null; | | MyConsumerWithNewSession(Connection connection, Destination topic, String localTarget, | int delay_ms, boolean explicit_ack, String payload) { | this.connection = connection; | this.topic = topic; | this.localTarget = localTarget; | this.delay_ms = delay_ms; | this.explicit_ack = explicit_ack; | this.payload = payload; | start(); | } | | public void run(){ | try { | consumeSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | consumer = consumeSession.createConsumer(topic, "target='"+localTarget+"'", false); | | Thread.sleep(delay_ms); | System.out.println("Receiver waking up"); | for (int i=0; i< 10000; i ++) { | Message receivedMessage = consumer.receive(); | if (explicit_ack) { | receivedMessage.acknowledge(); | } | String receivedPayload = (String) ((ObjectMessage) receivedMessage).getObject(); | // System.out.println("Receive Read at iter "+i+" : " + receivedPayload); | // System.out.println("Receive iter "+i+" : "+receivedPayload.substring(19)); | // assertEquals(true, receivedPayload.equals(payload+i)); | | if (i%100 == 0) { | System.out.println("Received " + i + "messages"); | } | } | | System.out.println("Done with Consumer thread"); | | } catch (Throwable t) { | System.err.println("Receiver Thread encountered Error: "+t.getMessage()); | // t.printStackTrace(System.err); | } | } | } | | static class WatchdogTimer extends Thread { | | long waitTime = 0L; | boolean interrupted = false; | Connection connection = null; | MessageProducer[] producers = null; | MessageConsumer [] consumers = null; | | public WatchdogTimer (MessageProducer[] producers, MessageConsumer [] consumers, | long waitTime, Connection connection) { | this.producers = producers; | this.consumers = consumers; | this.waitTime = waitTime; | this.connection = connection; | start(); | } | | public void run () { | | try { | Thread.sleep(this.waitTime); | this.interrupted = true; | | System.out.println("Watchdog waking up: closing producers and consumers"); | for (int i = 0; i < this.consumers.length; i++) { | this.consumers.close(); | } | for (int i = 0; i < this.producers.length; i++) { | this.producers.close(); | } | | connection.close(); | } catch (InterruptedException thrExc) { | System.out.println("watchdog interrupted"); | } catch (JMSException jmsExc) { | System.out.println("watchdog got jmsExc"); | } | | } | | } | | } | View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3953539#3953539 Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3953539 Using Tomcat but need to do more? Need to support web services, security? Get stuff done quickly with pre-integrated technology to make your job easier Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642 _______________________________________________ JBoss-user mailing list JBoss-user@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/jboss-user