Repository: cxf Updated Branches: refs/heads/master 0dfaf8d72 -> e1c60863a
[CXF-6742] Run jms destination with InitialContext Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e1c60863 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e1c60863 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e1c60863 Branch: refs/heads/master Commit: e1c60863ac10b56d423613c7d3d2f45c7ce18e14 Parents: 68f110e Author: Christian Schneider <ch...@die-schneider.net> Authored: Fri Jan 15 14:41:02 2016 +0100 Committer: Christian Schneider <ch...@die-schneider.net> Committed: Fri Jan 15 14:41:20 2016 +0100 ---------------------------------------------------------------------- .../cxf/transport/jms/JMSDestination.java | 1 + .../util/AbstractMessageListenerContainer.java | 29 ++++++++++++++++++++ .../cxf/transport/jms/util/JndiHelper.java | 20 +------------- .../util/PollingMessageListenerContainer.java | 24 ++++++---------- .../cxf/transport/jms/AbstractJMSTester.java | 2 +- .../cxf/transport/jms/JMSDestinationTest.java | 1 + 6 files changed, 41 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index 113a7d2..2b5d8cd 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -144,6 +144,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess if (executor instanceof Executor) { container.setExecutor((Executor) executor); } + container.setJndiEnvironment(jmsConfig.getJndiEnvironment()); container.start(); suspendedContinuations.setListenerContainer(container); connection.start(); http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java index 65d6c4c..8fd1cdc 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java @@ -18,16 +18,20 @@ */ package org.apache.cxf.transport.jms.util; +import java.util.Properties; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageListener; import javax.jms.Session; +import javax.naming.InitialContext; +import javax.naming.NamingException; import javax.transaction.TransactionManager; import org.apache.cxf.common.logging.LogUtils; @@ -45,6 +49,7 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon protected String durableSubscriptionName; protected boolean pubSubNoLocal; protected TransactionManager transactionManager; + protected Properties jndiEnvironment; private Executor executor; private int concurrentConsumers = 1; @@ -84,7 +89,31 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon public void setExecutor(Executor executor) { this.executor = executor; } + + public void setJndiEnvironment(Properties jndiEnvironment) { + this.jndiEnvironment = jndiEnvironment; + } + /** + * Creates a InitialContext if a JNDI environment has been provided. + * This is usefull in e.g. weblogic, where interaction with JNDI JMS resources is secured. + * + * Be careful not to cache the return value in a non thread local scope. + * + * @return an initial context, with the endpoint's JNDI properties, + * or null if none is provided or if an errur occurs + **/ + public InitialContext createInitialContext() { + if (jndiEnvironment != null) { + try { + return new InitialContext(this.jndiEnvironment); + } catch (NamingException e) { + LOG.log(Level.SEVERE, "Could not expose JNDI environment to JMS thread context", e); + } + } + return null; + } + @Override public void stop() { // In case of using external executor, don't shutdown it http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java index 7cc0e42..5035e0d 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java @@ -18,8 +18,6 @@ */ package org.apache.cxf.transport.jms.util; -import java.util.Enumeration; -import java.util.Hashtable; import java.util.Properties; import javax.naming.Context; @@ -38,25 +36,9 @@ public class JndiHelper { this.environment = environment; } - @SuppressWarnings("rawtypes") - protected Context createInitialContext() throws NamingException { - //CHECKSTYLE:OFF - Hashtable<Object, Object> icEnv = new Hashtable<Object, Object>(environment.size()); - //CHECKSTYLE:ON - for (Enumeration en = environment.propertyNames(); en.hasMoreElements();) { - String key = (String)en.nextElement(); - Object value = environment.getProperty(key); - if (value == null) { - value = environment.get(key); - } - icEnv.put(key, value); - } - return new InitialContext(icEnv); - } - @SuppressWarnings("unchecked") public <T> T lookup(final String name, Class<T> requiredType) throws NamingException { - Context ctx = createInitialContext(); + Context ctx = new InitialContext(this.environment); try { Object located = ctx.lookup(name); if (located == null) { http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index 0acd40f..9f8fcb2 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -49,12 +49,11 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont @Override public void run() { while (running) { - MessageConsumer consumer = null; - Session session = null; - try { + try (ResourceCloser closer = new ResourceCloser()) { + closer.register(createInitialContext()); // Create session early to optimize performance - session = connection.createSession(transacted, acknowledgeMode); - consumer = createConsumer(session); + Session session = closer.register(connection.createSession(transacted, acknowledgeMode)); + MessageConsumer consumer = closer.register(createConsumer(session)); while (running) { Message message = consumer.receive(1000); try { @@ -71,9 +70,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } } catch (Exception e) { LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e); - } finally { - ResourceCloser.close(consumer); - ResourceCloser.close(session); } } @@ -96,9 +92,8 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont @Override public void run() { while (running) { - MessageConsumer consumer = null; - Session session = null; - try { + try (ResourceCloser closer = new ResourceCloser()) { + closer.register(createInitialContext()); final Transaction externalTransaction = transactionManager.getTransaction(); if ((externalTransaction != null) && (externalTransaction.getStatus() == Status.STATUS_ACTIVE)) { LOG.log(Level.SEVERE, "External transactions are not supported in XAPoller"); @@ -109,8 +104,8 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont * Create session inside transaction to give it the * chance to enlist itself as a resource */ - session = connection.createSession(transacted, acknowledgeMode); - consumer = createConsumer(session); + Session session = closer.register(connection.createSession(transacted, acknowledgeMode)); + MessageConsumer consumer = closer.register(createConsumer(session)); Message message = consumer.receive(1000); try { if (message != null) { @@ -120,9 +115,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } catch (Throwable e) { LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e); safeRollBack(session); - } finally { - ResourceCloser.close(consumer); - ResourceCloser.close(session); } } catch (Exception e) { LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e); http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java index 7abbfe2..b34f53d 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java @@ -221,7 +221,7 @@ public abstract class AbstractJMSTester extends Assert { protected void waitForReceiveDestMessage() { int waitTime = 0; - while (destMessage == null && waitTime < MAX_RECEIVE_TIME) { + while (destMessage == null && waitTime < MAX_RECEIVE_TIME * 10) { try { Thread.sleep(100); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java index db7e241..690c721 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java @@ -241,6 +241,7 @@ public class JMSDestinationTest extends AbstractJMSTester { waitForReceiveInMessage(); verifyReceivedMessage(inMessage); + // wait for a while for the jms session recycling Thread.sleep(1000); conduit.close(); destination.shutdown();