Sorry for posting this late.

Upgrading to CR2 of jboss-messaging solved the hang problem. I am posting the 
working code anyway. 

And these variables need to be set in build.properties.

jms.test.java.naming.provider.url = jnp://<server name>:1099
jms.test.java.naming.factory.initial = org.jnp.interfaces.NamingContextFactory

And the jboss client is a standalone program.

Thanks
Raghu


  | import java.util.Date;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.Destination;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageProducer;
  | import javax.jms.ObjectMessage;
  | import javax.jms.Session;
  | import javax.jms.Topic;
  | import javax.naming.InitialContext;
  | 
  | 
  | import junit.framework.TestCase;
  | 
  | public class TestJMS1 extends TestCase {
  | 
  |     Date date;
  |     
  |     static final String nameTopicConnFactory = "XAConnectionFactory";
  |     static final String nameTopic            = "topic/testTopic";
  |     
  |     static final int CONSUMER_WAIT_TIME      = 60000;
  |     static final int RELAYER_WAIT_TIME       = 30000;
  |     
  |     public TestJMS1(String name) {
  |       super(name);
  |     }
  | 
  |     public void setUp() {
  |     }
  | 
  |     public void tearDown() {
  |     }
  |     
  |     public void testJMSFastPublisherSlowConsumerWithRelay() throws 
Exception {
  |       String testName = "Publish/Subscribe with Slow Consumer and Relayer - 
AUTO ACK ";
  |       ConnectionFactory connFactory = null;
  |       Connection connection = null;
  |       Session session = null;
  |       Destination topic = null;
  |       MyRelayerWithNewSession relayThread = null;
  |       MyConsumerWithNewSession consumerThread = null;
  |       WatchdogTimer watchdog = null;
  |       
  |       MessageProducer[] producers = new MessageProducer[1];
  |       
  |       try {    
  |         InitialContext ic = new InitialContext ();
  |         //System.out.println ("Created InitialContext :: " + ic);
  |         String payload = "The static portion ";
  |         
  |         connFactory = (ConnectionFactory) ic.lookup (nameTopicConnFactory);
  |         topic = (Topic)ic.lookup(nameTopic);
  |         connection = connFactory.createConnection();
  |         
  |         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  |         producers[0] = session.createProducer(topic);
  |       
  |         relayThread = new MyRelayerWithNewSession(connection, topic, 
"text0", "text1",
  |           RELAYER_WAIT_TIME, false, payload);
  |         consumerThread = new MyConsumerWithNewSession(connection, topic, 
"text1", 
  |           CONSUMER_WAIT_TIME, false, payload);
  | 
  |         connection.start();
  |         
  |         // watchdog = new WatchdogTimer(producers, consumers, 90000, 
connection);
  |         
  |         //System.out.println("Sender starting");
  |         try {
  |         // For threads to create the sessions.
  |         Thread.sleep(5000);
  |         for(int i=0; i<10000; i++) {
  |           ObjectMessage message = session.createObjectMessage();
  |           message.setStringProperty("target", "text0");
  |           message.setObject(payload+i);
  |           producers[0].send(message);
  |           // System.out.println("Sent iter : " + i);
  |           // Thread.sleep(1000);
  |           if (i%100 == 0) {
  |             System.out.println("Sent " + i + "messages");  
  |           }
  |         }
  |         } catch (Throwable t ) {
  |         System.out.println("Producer1 send got Error: "+t.getMessage());
  |         }
  |         
  |         System.out.println("Done with sending");
  |         Thread.sleep(200000);
  |         consumerThread.join();
  |         relayThread.join();
  |         
  |       } catch (Throwable t) {
  |         System.err.println("Error: "+t.getMessage());
  |         // t.printStackTrace(System.err);
  |       } finally {
  |         try {
  |         if (connection != null){
  |           connection.close();
  |           connection = null;
  |         }
  |         } catch (JMSException e) {
  |         e.printStackTrace();
  |         }
  |       }
  |       
  | //      if (watchdog.isAlive()) {
  | //        watchdog.interrupt();
  | //      }
  | //      watchdog.join();
  |       
  |       date = new Date();
  |       if (!watchdog.interrupted) {
  |         System.out.println(date.toString()+": "+testName + " : PASSED");
  |       } else {
  |         System.out.println(date.toString()+": "+testName + " : FAILED");
  |       }
  |     }
  |     
  |     class MyRelayerWithNewSession extends Thread {
  |       Connection connection = null;
  |       Destination topic = null;
  |       String localTarget = null;
  |       String relayTarget = null;
  |       int delay_ms = 0;
  |       boolean explicit_ack = false;
  |       String payload = null;
  |       
  |       Session consumeSession = null;
  |       Session produceSession = null;
  |       
  |       MessageConsumer consumer = null;
  |       MessageProducer producer = null;
  |       
  |       MyRelayerWithNewSession(Connection connection, Destination topic, 
String localTarget, 
  |         String relayTarget, int delay_ms, boolean explicit_ack, String 
payload) {
  |         this.connection = connection;
  |         this.topic = topic;
  |         this.localTarget = localTarget;
  |         this.relayTarget = relayTarget;
  |         this.delay_ms = delay_ms;
  |         this.explicit_ack = explicit_ack;
  |         this.payload = payload;
  |         start();
  |       }
  | 
  |       public void run(){
  |         
  |         try {
  |         consumeSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
  |         produceSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
  |         
  |         consumer = consumeSession.createConsumer(topic, 
"target='"+localTarget+"'", false);
  |         producer = produceSession.createProducer(topic);
  |   
  |         Thread.sleep(delay_ms);
  |         System.out.println("Relayer thread waking up");
  |         for (int i=0; i< 10000; i ++) { 
  |           Message receivedMessage = consumer.receive();
  |           if (explicit_ack) {
  |             receivedMessage.acknowledge();
  |           }
  |           String receivedPayload = (String) ((ObjectMessage) 
receivedMessage).getObject();
  |           // System.out.println("Relay Read at iter "+i+" : " + 
receivedPayload);
  |           // System.out.println("Relay iter "+i+" : 
"+receivedPayload.substring(19));
  |           // assertEquals(true, receivedPayload.equals(payload+i));
  |           
  |           ObjectMessage message = produceSession.createObjectMessage();
  |           message.setStringProperty("target", relayTarget);
  |           message.setObject(receivedPayload);
  |           producer.send(message);
  | 
  |           if (i%100 == 0) {
  |             System.out.println("Relayed " + i + "messages");       
  |           }
  |         }
  |   
  |         System.out.println("Done with Relayer thread");
  |   
  |         } catch (Throwable t) {
  |         System.err.println("Relayer Thread encountered Error: 
"+t.getMessage());
  |         // t.printStackTrace(System.err);
  |         }
  |       }
  |       
  |     }
  |     
  |     class MyConsumerWithNewSession extends Thread {
  |       Connection connection = null;
  |       Destination topic = null;
  |       String localTarget = null;
  |       int delay_ms = 0;
  |       boolean explicit_ack = false;
  |       String payload = null;
  |       
  |       Session consumeSession = null;
  |       MessageConsumer consumer = null;
  |       
  |       MyConsumerWithNewSession(Connection connection, Destination topic, 
String localTarget, 
  |         int delay_ms, boolean explicit_ack, String payload) {
  |         this.connection = connection;
  |         this.topic = topic;
  |         this.localTarget = localTarget;
  |         this.delay_ms = delay_ms;
  |         this.explicit_ack = explicit_ack;
  |         this.payload = payload;
  |         start();
  |       }
  |       
  |       public void run(){
  |         try {
  |         consumeSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
  |         consumer = consumeSession.createConsumer(topic, 
"target='"+localTarget+"'", false);
  | 
  |         Thread.sleep(delay_ms);
  |         System.out.println("Receiver waking up");
  |         for (int i=0; i< 10000; i ++) {
  |           Message receivedMessage = consumer.receive();
  |           if (explicit_ack) {
  |             receivedMessage.acknowledge(); 
  |           }
  |           String receivedPayload = (String) ((ObjectMessage) 
receivedMessage).getObject();
  |           // System.out.println("Receive Read at iter "+i+" : " + 
receivedPayload);
  |           // System.out.println("Receive iter "+i+" : 
"+receivedPayload.substring(19));
  |           // assertEquals(true, receivedPayload.equals(payload+i));
  |           
  |           if (i%100 == 0) {         
  |             System.out.println("Received " + i + "messages");
  |           }       
  |         }
  |   
  |         System.out.println("Done with Consumer thread");
  |   
  |         } catch (Throwable t) {
  |         System.err.println("Receiver Thread encountered Error: 
"+t.getMessage());
  |         // t.printStackTrace(System.err);
  |         }
  |       }
  |     }
  |     
  |     static class WatchdogTimer extends Thread {
  | 
  |       long waitTime = 0L;
  |       boolean interrupted = false;
  |       Connection connection = null;
  |       MessageProducer[] producers = null;
  |       MessageConsumer [] consumers = null;
  |       
  |       public WatchdogTimer (MessageProducer[] producers, MessageConsumer [] 
consumers,
  |         long waitTime, Connection connection) {
  |         this.producers = producers;
  |         this.consumers = consumers;
  |         this.waitTime = waitTime;
  |         this.connection = connection;
  |         start();
  |       }
  |       
  |       public void run () {
  |         
  |         try {
  |         Thread.sleep(this.waitTime);
  |         this.interrupted = true;
  |   
  |         System.out.println("Watchdog waking up: closing producers and 
consumers");
  |         for (int i = 0; i < this.consumers.length; i++) {
  |           this.consumers.close();
  |         }
  |         for (int i = 0; i < this.producers.length; i++) {
  |           this.producers.close();
  |         }
  |   
  |         connection.close();
  |         } catch (InterruptedException thrExc) {
  |         System.out.println("watchdog interrupted");
  |         } catch (JMSException jmsExc) {
  |         System.out.println("watchdog got jmsExc");
  |         }
  | 
  |       } 
  |       
  |     }
  | 
  | }
  | 

View the original post : 
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3953539#3953539

Reply to the post : 
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3953539

Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
JBoss-user mailing list
JBoss-user@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/jboss-user

Reply via email to