Repository: cxf Updated Branches: refs/heads/jms-exception-handling [created] 30f0743d0
[CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/30f0743d Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/30f0743d Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/30f0743d Branch: refs/heads/jms-exception-handling Commit: 30f0743d01a3b3060b35622870de6481c098742a Parents: b43dfb9 Author: Christian Schneider <ch...@die-schneider.net> Authored: Thu Apr 20 14:15:29 2017 +0200 Committer: Christian Schneider <ch...@die-schneider.net> Committed: Thu Apr 20 14:15:29 2017 +0200 ---------------------------------------------------------------------- .../cxf/transport/jms/JMSDestination.java | 7 +- .../util/PollingMessageListenerContainer.java | 80 +++++--------------- .../transport/jms/util/MessageListenerTest.java | 9 ++- 3 files changed, 29 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index 2d7f4db..9794cd5 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -118,19 +118,20 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess Session session = null; try { connection = JMSFactory.createConnection(jmsConfig); - connection.setExceptionListener(new ExceptionListener() { + ExceptionListener exListener = new ExceptionListener() { public void onException(JMSException exception) { if (!shutdown) { LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", exception); restartConnection(); } } - }); + }; session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = jmsConfig.getTargetDestination(session); PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, - destination, this); + destination, + this, exListener); container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers()); container.setTransactionManager(jmsConfig.getTransactionManager()); container.setMessageSelector(jmsConfig.getMessageSelector()); http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index c4276eb..815bcf1 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -23,6 +23,7 @@ import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -36,12 +37,14 @@ import org.apache.cxf.common.logging.LogUtils; public class PollingMessageListenerContainer extends AbstractMessageListenerContainer { private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class); + private ExceptionListener exceptionListener; public PollingMessageListenerContainer(Connection connection, Destination destination, - MessageListener listenerHandler) { + MessageListener listenerHandler, ExceptionListener exceptionListener) { this.connection = connection; this.destination = destination; this.listenerHandler = listenerHandler; + this.exceptionListener = exceptionListener; } private class Poller extends AbstractPoller implements Runnable { @@ -49,11 +52,10 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont @Override public void run() { Session session = null; - init(); while (running) { try (ResourceCloser closer = new ResourceCloser()) { closer.register(createInitialContext()); - // Create session early to optimize performance + // Create session early to optimize performance // In session = closer.register(connection.createSession(transacted, acknowledgeMode)); MessageConsumer consumer = closer.register(createConsumer(session)); while (running) { @@ -70,14 +72,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont safeRollBack(session); } } - } catch (Throwable e) { - catchUnexpectedExceptionDuringPolling(null, e); + } catch (Exception e) { + handleException(e); } } - } - @Override protected void safeRollBack(Session session) { try { if (session != null && session.getTransacted()) { @@ -94,7 +94,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont @Override public void run() { - init(); while (running) { try (ResourceCloser closer = new ResourceCloser()) { closer.register(createInitialContext()); @@ -121,14 +120,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont safeRollBack(session); } } catch (Exception e) { - catchUnexpectedExceptionDuringPolling(null, e); + handleException(e); } - } } - @Override protected void safeRollBack(Session session) { try { transactionManager.rollback(); @@ -140,61 +137,18 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } private abstract class AbstractPoller { - private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception"; - private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry"; - protected int retryCounter = -1; - protected int counter; - protected int sleepingTime = 5000; - - protected void init() { - if (jndiEnvironment != null) { - if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) { - retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION)); - } - if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) { - sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY)); - } - } - } - - protected boolean hasToCount() { - return retryCounter > -1; - } - - protected boolean hasToStop() { - return counter > retryCounter; - } - - protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e) { + protected void handleException(Exception e) { LOG.log(Level.WARNING, "Unexpected exception.", e); - if (hasToCount()) { - counter++; - if (hasToStop()) { - stop(session, e); - } - } - if (running) { - try { - String log = "Now sleeping for " + sleepingTime / 1000 + " seconds"; - log += hasToCount() - ? ". Then restarting session and consumer: attempt " + counter + "/" + retryCounter - : ""; - LOG.log(Level.WARNING, log); - Thread.sleep(sleepingTime); - } catch (InterruptedException e1) { - LOG.log(Level.WARNING, e1.getMessage()); - } - } - } - - protected void stop(Session session, Throwable e) { - LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e); - safeRollBack(session); running = false; + JMSException wrapped; + if (e instanceof JMSException) { + wrapped = (JMSException) e; + } else { + wrapped = new JMSException(""); + wrapped.addSuppressed(e); + } + PollingMessageListenerContainer.this.exceptionListener.onException(wrapped); } - - protected abstract void safeRollBack(Session session); - } private MessageConsumer createConsumer(Session session) throws JMSException { http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index 82cc37a..4adf921 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -20,6 +20,7 @@ package org.apache.cxf.transport.jms.util; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -52,8 +53,14 @@ public class MessageListenerTest { Queue dest = JMSUtil.createQueue(connection, "test"); MessageListener listenerHandler = new TestMessageListener(); + ExceptionListener exListener = new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + } + }; PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest, - listenerHandler); + listenerHandler, exListener); container.setTransacted(false); container.setAcknowledgeMode(Session.SESSION_TRANSACTED);