Hello! I have a process communicating with JBoss via JMS, and I would like it to recover gracefully when JBoss is rebooted. I thought that if I registered an ExceptionListener with my QueueConnection that it would be invoked when JBoss goes down, but I haven't observed that. I've enclosed an example program to demonstrate what I'm trying to do; its output is
Starting JBoss...JBoss is up Sending message...enqueued...received Shutting down JBoss...JBoss is down Starting JBoss...JBoss is up Sending message...enqueued... [Then it hangs.] Any pointers as to where I'm going wrong are much appreciated. Thanks! Nate | import java.io.*; | import java.lang.IllegalStateException; | import java.util.*; | import javax.jms.*; | import javax.naming.*; | | public final class JBossRebootExperiment2 { | public static void main(final String[] args) { | Listener listener = null; | try { | Process jboss = startJBoss(); | try { | final Queue queue = (Queue) getJBossContext().lookup(QUEUE_NAME); | final QueueConnection connection = createQueueConnection(); | final QueueSession session = connection.createQueueSession(true, 0); | listener = new Listener(session); | try { | session.createReceiver(queue).setMessageListener(listener); | connection.setExceptionListener(listener); | connection.start(); | sendMessage(listener); | | try { | shutdownJBoss(jboss); | } | finally { | jboss = null; | } | | try { | Thread.sleep(5000); | } | catch (final InterruptedException e) { | } | | jboss = startJBoss(); | sendMessage(listener); | } | finally { | safeClose(connection); | System.out.println("Messages successfully transmitted: " + listener.messagesReceived); | System.out.println("Exceptions received: " + listener.exceptionsReceived); | } | } | finally { | if (jboss != null) { | shutdownJBoss(jboss); | } | } | } | catch (final Exception e) { | e.printStackTrace(); | } | } | | private static final class Listener implements MessageListener, ExceptionListener { | public Listener(final QueueSession session) { | this.session = session; | } | | public synchronized void onMessage(final Message message) { | try { | session.commit(); | ++messagesReceived; | notifyAll(); | } | catch (final JMSException e) { | e.printStackTrace(); | } | } | | public synchronized void onException(final JMSException e) { | ++exceptionsReceived; | notifyAll(); | } | | public volatile int messagesReceived; | public volatile int exceptionsReceived; | private final QueueSession session; | } | | private static QueueConnection createQueueConnection() | throws JMSException, NamingException { | final QueueConnectionFactory qcf = (QueueConnectionFactory) | getJBossContext().lookup(CONNECTION_FACTORY_NAME); | return qcf.createQueueConnection(); | } | | private static void sendMessage(final Listener listener) | throws InterruptedException, JMSException, NamingException { | System.out.print("Sending message..."); | final int oldMessagesReceived = listener.messagesReceived; | sendMsg(); | System.out.print("enqueued..."); | synchronized (listener) { | while (oldMessagesReceived == listener.messagesReceived) listener.wait(); | } | System.out.println("received"); | } | | private static void sendMsg() throws JMSException, NamingException { | final Queue queue = (Queue) getJBossContext().lookup(QUEUE_NAME); | final QueueConnection connection = createQueueConnection(); | try { | final QueueSession session = connection.createQueueSession(true, 0); | final QueueSender sender = session.createSender(queue); | sender.setDeliveryMode(DeliveryMode.PERSISTENT); | sender.send(session.createTextMessage("foo")); | session.commit(); | } | finally { | safeClose(connection); | } | } | | private static void safeClose(final QueueConnection qc) { | try { | qc.close(); | } | catch (final JMSException e) { | System.out.print("QueueConnection.close failed: "); | e.printStackTrace(); | } | } | | private static void safeClose(final Reader reader) { | try { | reader.close(); | } | catch (final IOException e) { | System.out.print("Reader.close failed: "); | e.printStackTrace(); | } | } | | private static Context getJBossContext() throws NamingException { | final Hashtable ht = new Hashtable(); | ht.put(Context.INITIAL_CONTEXT_FACTORY, | "org.jnp.interfaces.NamingContextFactory"); | ht.put(Context.PROVIDER_URL, "jnp://localhost"); | ht.put(Context.URL_PKG_PREFIXES, | "org.jboss.naming:org.jnp.interfaces"); | return new InitialContext(ht); | } | | private static Process startJBoss() throws IOException { | System.out.print("Starting JBoss..."); | final Process jboss = Runtime.getRuntime().exec(JBOSS_DIR + "/bin/run.sh"); | final BufferedReader reader = new BufferedReader | (new InputStreamReader(jboss.getInputStream())); | try { | String line = reader.readLine(); | while (line.indexOf("JBoss") == -1 || line.indexOf("MicroKernel") == -1 || | line.indexOf("Started") == -1) { | line = reader.readLine(); | } | System.out.println("JBoss is up"); | } | finally { | safeClose(reader); | } | | return jboss; | } | | private static void shutdownJBoss(final Process jboss) throws IOException { | System.out.print("Shutting down JBoss..."); | try { | // Give JBoss 15 seconds to go peacefully | Runtime.getRuntime().exec(JBOSS_DIR + "/bin/shutdown.sh -S"); | final Interrupter interrupter = | new Interrupter(Thread.currentThread(), 15000); | jboss.waitFor(); | interrupter.cancel(); | } | catch (final InterruptedException e) { | killJBoss(); | } | System.out.println("JBoss is down"); | } | | private static void killJBoss() throws IOException { | for (int i = 0; i < 2; ++i) { | final Process psef = Runtime.getRuntime().exec("ps -ef"); | final BufferedReader reader = new BufferedReader | (new InputStreamReader(psef.getInputStream())); | try { | String line; | while ((line = reader.readLine()) != null) { | if (line.indexOf("org.jboss.Main") >= 0) { | Runtime.getRuntime().exec("kill -9 " + line.substring(9, 14)); | } | } | } | finally { | safeClose(reader); | } | } | } | | private static final class Interrupter implements Runnable { | public Interrupter(final Thread thread2interrupt, final long millis2wait) { | if (thread2interrupt == null) { | throw new NullPointerException("thread2interrupt must be non-null"); | } | else if (millis2wait < 0) { | throw new IllegalArgumentException("millis2wait must be >= 0"); | } | | this.thread2interrupt = thread2interrupt; | this.millis2wait = millis2wait; | interrupterThread = new Thread(this); | interrupterThread.start(); | } | | public void cancel() { | interrupterThread.interrupt(); | } | | public void run() { | if (Thread.currentThread() != interrupterThread) { | throw new IllegalStateException("Wrong thread"); | } | | try { | Thread.sleep(millis2wait); | thread2interrupt.interrupt(); | } | catch (final InterruptedException e) { | } | } | | private final Thread thread2interrupt; | private final Thread interrupterThread; | private final long millis2wait; | } | | | | private static final String QUEUE_NAME = "queue/A"; | private static final String CONNECTION_FACTORY_NAME = "ConnectionFactory"; | private static final String JBOSS_DIR = "/opt/jboss"; | } | <a href="http://www.jboss.org/index.html?module=bb&op=viewtopic&p=3826619#3826619">View the original post</a> <a href="http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=3826619>Reply to the post</a> ------------------------------------------------------- This SF.Net email is sponsored by: IBM Linux Tutorials Free Linux tutorial presented by Daniel Robbins, President and CEO of GenToo technologies. Learn everything from fundamentals to system administration.http://ads.osdn.com/?ad_id=1470&alloc_id=3638&op=click _______________________________________________ JBoss-user mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-user