[KARAF-5129] JMS pooling + support for Artemis Project: http://git-wip-us.apache.org/repos/asf/karaf/repo Commit: http://git-wip-us.apache.org/repos/asf/karaf/commit/dba4b37c Tree: http://git-wip-us.apache.org/repos/asf/karaf/tree/dba4b37c Diff: http://git-wip-us.apache.org/repos/asf/karaf/diff/dba4b37c
Branch: refs/heads/master Commit: dba4b37cbb72c41fca3823cd4d0a2a95d282415f Parents: a8b9fd7 Author: Guillaume Nodet <gno...@apache.org> Authored: Thu May 18 14:12:05 2017 +0200 Committer: Guillaume Nodet <gno...@apache.org> Committed: Sat May 20 06:33:05 2017 +0200 ---------------------------------------------------------------------- .../enterprise/src/main/feature/feature.xml | 7 +- jms/core/pom.xml | 5 +- .../apache/karaf/jms/command/CreateCommand.java | 4 +- .../ActiveMQDestinationSourceFactory.java | 68 +++ .../ArtemisDestinationSourceFactory.java | 61 +++ .../karaf/jms/internal/DestinationSource.java | 35 ++ .../apache/karaf/jms/internal/JmsConnector.java | 20 +- .../karaf/jms/internal/JmsServiceImpl.java | 179 +++--- .../apache/karaf/jms/internal/JsonReader.java | 352 ++++++++++++ .../karaf/jms/internal/osgi/Activator.java | 33 ++ .../resources/OSGI-INF/blueprint/jms-core.xml | 55 -- .../jms/internal/connectionfactory-activemq.xml | 20 +- .../jms/internal/connectionfactory-artemis.xml | 35 ++ .../internal/connectionfactory-webspheremq.xml | 19 +- jms/pom.xml | 3 +- jms/pool/pom.xml | 104 ++++ .../karaf/jms/pool/internal/ConnectionKey.java | 75 +++ .../karaf/jms/pool/internal/ConnectionPool.java | 315 +++++++++++ .../jms/pool/internal/IntrospectionSupport.java | 123 +++++ .../jms/pool/internal/PooledConnection.java | 285 ++++++++++ .../pool/internal/PooledConnectionFactory.java | 538 +++++++++++++++++++ .../pool/internal/PooledMessageConsumer.java | 76 +++ .../karaf/jms/pool/internal/PooledProducer.java | 168 ++++++ .../jms/pool/internal/PooledQueueSender.java | 51 ++ .../karaf/jms/pool/internal/PooledSession.java | 500 +++++++++++++++++ .../internal/PooledSessionEventListener.java | 48 ++ .../jms/pool/internal/PooledTopicPublisher.java | 57 ++ .../karaf/jms/pool/internal/SessionKey.java | 65 +++ .../karaf/jms/pool/internal/osgi/Activator.java | 180 +++++++ 29 files changed, 3286 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/assemblies/features/enterprise/src/main/feature/feature.xml ---------------------------------------------------------------------- diff --git a/assemblies/features/enterprise/src/main/feature/feature.xml b/assemblies/features/enterprise/src/main/feature/feature.xml index 90a7c5c..4fb3cee 100644 --- a/assemblies/features/enterprise/src/main/feature/feature.xml +++ b/assemblies/features/enterprise/src/main/feature/feature.xml @@ -175,10 +175,13 @@ <feature name="jms" description="JMS service and commands" version="${project.version}"> <details>JMS support provinding service, commands, and MBean.</details> - <feature version="[2,3)">transaction</feature> - <feature>aries-blueprint</feature> + <feature dependency="true">aries-blueprint</feature> + <bundle dependency="true">mvn:javax.jms/javax.jms-api/2.0</bundle> + <bundle dependency="true">mvn:org.apache.geronimo.specs/geronimo-jta_1.1_spec/${geronimo.jta-spec.version}</bundle> <bundle dependency="true">mvn:org.apache.geronimo.specs/geronimo-jms_1.1_spec/${geronimo.jms-spec.version}</bundle> + <bundle dependency="true">mvn:org.apache.commons/commons-pool2/2.4.2</bundle> <bundle>mvn:org.apache.karaf.jms/org.apache.karaf.jms.core/${project.version}</bundle> + <bundle>mvn:org.apache.karaf.jms/org.apache.karaf.jms.pool/${project.version}</bundle> <!-- Requirement on Blueprint. We don't use a feature dependency to allow the choice between aries-blueprint and gemini-blueprint. http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/pom.xml ---------------------------------------------------------------------- diff --git a/jms/core/pom.xml b/jms/core/pom.xml index cb65b18..22da830 100644 --- a/jms/core/pom.xml +++ b/jms/core/pom.xml @@ -99,9 +99,10 @@ <configuration> <instructions> <Export-Package> - org.apache.karaf.jms + org.apache.karaf.jms;-noimport:=true </Export-Package> <Import-Package> + javax.jms;version="[1.1,3)", org.apache.activemq*;resolution:=optional, * </Import-Package> @@ -109,9 +110,9 @@ org.apache.karaf.jms.command, org.apache.karaf.jms.command.completers, org.apache.karaf.jms.internal, + org.apache.karaf.jms.internal.osgi, org.apache.karaf.util </Private-Package> - <DynamicImport-Package>*</DynamicImport-Package> </instructions> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/java/org/apache/karaf/jms/command/CreateCommand.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/command/CreateCommand.java b/jms/core/src/main/java/org/apache/karaf/jms/command/CreateCommand.java index fb91416..64ccf02 100644 --- a/jms/core/src/main/java/org/apache/karaf/jms/command/CreateCommand.java +++ b/jms/core/src/main/java/org/apache/karaf/jms/command/CreateCommand.java @@ -30,8 +30,8 @@ public class CreateCommand extends JmsCommandSupport { @Argument(index = 0, name = "name", description = "The JMS connection factory name", required = true, multiValued = false) String name; - @Option(name = "-t", aliases = { "--type" }, description = "The JMS connection factory type (ActiveMQ or WebsphereMQ)", required = false, multiValued = false) - @Completion(value = StringsCompleter.class, values = { "activemq", "webspheremq" }) + @Option(name = "-t", aliases = { "--type" }, description = "The JMS connection factory type (ActiveMQ, Artemis or WebsphereMQ)", required = false, multiValued = false) + @Completion(value = StringsCompleter.class, values = { "activemq", "artemis", "webspheremq" }) String type = "ActiveMQ"; @Option(name = "--url", description = "URL of the JMS broker. For WebsphereMQ type, the URL is hostname/port/queuemanager/channel", required = false, multiValued = false) http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/java/org/apache/karaf/jms/internal/ActiveMQDestinationSourceFactory.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/ActiveMQDestinationSourceFactory.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/ActiveMQDestinationSourceFactory.java new file mode 100644 index 0000000..a6e5cb7 --- /dev/null +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/ActiveMQDestinationSourceFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Topic; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +class ActiveMQDestinationSourceFactory implements DestinationSource.Factory { + + @Override + public DestinationSource create(Connection connection) throws JMSException { + if (connection.getClass().getName().matches("org\\.apache\\.activemq\\.ActiveMQ(XA)?Connection")) { + try { + final Object destSource = connection.getClass().getMethod("getDestinationSource").invoke(connection); + return type -> getNames(destSource, type); + } catch (Exception e) { + // Ignore + } + } + return null; + } + + private List<String> getNames(Object destSource, DestinationSource.DestinationType type) { + try { + if (type == DestinationSource.DestinationType.Queue) { + @SuppressWarnings("unchecked") + Set<Queue> queues = (Set) destSource.getClass().getMethod("getQueues").invoke(destSource); + List<String> names = new ArrayList<>(); + for (Queue queue : queues) { + names.add(queue.getQueueName()); + } + return names; + } + if (type == DestinationSource.DestinationType.Topic) { + @SuppressWarnings("unchecked") + Set<Topic> topics = (Set) destSource.getClass().getMethod("getTopics").invoke(destSource); + List<String> names = new ArrayList<>(); + for (Topic topic : topics) { + names.add(topic.getTopicName()); + } + return names; + } + } catch (Exception e) { + // Ignore + } + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java new file mode 100644 index 0000000..3659a23 --- /dev/null +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueRequestor; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.StringReader; +import java.util.Collections; +import java.util.List; + +class ArtemisDestinationSourceFactory implements DestinationSource.Factory { + + @Override + public DestinationSource create(Connection connection) throws JMSException { + if (connection.getClass().getName().matches("org\\.apache\\.activemq\\.artemis\\.jms\\.client\\.ActiveMQ(XA)?Connection")) { + return type -> getNames(connection, type); + } + return null; + } + + private List<String> getNames(Connection connection, DestinationSource.DestinationType type) { + try { + QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + Queue managementQueue = session.createQueue("activemq.management"); + QueueRequestor requestor = new QueueRequestor(session, managementQueue); + connection.start(); + TextMessage m = session.createTextMessage(); + m.setStringProperty("_AMQ_ResourceName", "broker"); + m.setStringProperty("_AMQ_OperationName", "getQueueNames"); + String routing = type == DestinationSource.DestinationType.Queue ? "ANYCAST" : "MULTICAST"; + m.setText("[\"" + routing + "\"]"); + Message reply = requestor.request(m); + String json = ((TextMessage) reply).getText(); + List<?> array = (List<?>) JsonReader.read(new StringReader(json)); + return (List<String>) array.get(0); + } catch (Exception e) { + return Collections.emptyList(); + } + } +} http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java new file mode 100644 index 0000000..efc7bcd --- /dev/null +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import javax.jms.Connection; +import javax.jms.JMSException; +import java.util.List; + +interface DestinationSource { + + enum DestinationType { + Queue, Topic + } + + interface Factory { + + DestinationSource create(Connection connection) throws JMSException; + } + + List<String> getNames(DestinationType type); +} http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java index 80eb457..63ff95a 100644 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java @@ -19,6 +19,8 @@ package org.apache.karaf.jms.internal; import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -46,19 +48,17 @@ public class JmsConnector implements Closeable { } private ServiceReference<ConnectionFactory> lookupConnectionFactory(String name) { - Collection<ServiceReference<ConnectionFactory>> references; try { - references = bc.getServiceReferences(ConnectionFactory.class, "(|(osgi.jndi.service.name=" + name + ")(name=" + name + ")(service.id=" + name + "))"); + Collection<ServiceReference<ConnectionFactory>> references = bc.getServiceReferences( + ConnectionFactory.class, + "(|(osgi.jndi.service.name=" + name + ")(name=" + name + ")(service.id=" + name + "))"); + return references.stream() + .sorted(Comparator.<ServiceReference<?>>naturalOrder().reversed()) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No JMS connection factory found for " + name)); } catch (InvalidSyntaxException e) { throw new RuntimeException("Error finding connection factory service " + name, e); } - if (references == null || references.size() == 0) { - throw new IllegalArgumentException("No JMS connection factory found for " + name); - } - if (references.size() > 1) { - throw new IllegalArgumentException("Multiple JMS connection factories found for " + name); - } - return references.iterator().next(); } @Override @@ -84,7 +84,7 @@ public class JmsConnector implements Closeable { public Connection connect() throws JMSException { reference = this.lookupConnectionFactory(connectionFactoryName); - ConnectionFactory cf = (ConnectionFactory) bc.getService(reference); + ConnectionFactory cf = bc.getService(reference); connection = cf.createConnection(username, password); connection.start(); return connection; http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java index 6f79794..f460018 100644 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java @@ -16,11 +16,6 @@ */ package org.apache.karaf.jms.internal; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.advisory.DestinationSource; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.pool.PooledConnection; import org.apache.karaf.jms.JmsMessage; import org.apache.karaf.jms.JmsService; import org.apache.karaf.util.TemplateUtils; @@ -32,7 +27,12 @@ import javax.jms.*; import java.io.*; import java.lang.IllegalStateException; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; +import java.util.stream.Collectors; /** * Default implementation of the JMS Service. @@ -40,11 +40,10 @@ import java.util.*; public class JmsServiceImpl implements JmsService { private BundleContext bundleContext; - private File deployFolder; + private Path deployFolder; public JmsServiceImpl() { - File karafBase = new File(System.getProperty("karaf.base")); - deployFolder = new File(karafBase, "deploy"); + deployFolder = Paths.get(System.getProperty("karaf.base"), "deploy"); } @Override @@ -54,13 +53,15 @@ public class JmsServiceImpl implements JmsService { @Override public void create(String name, String type, String url, String username, String password) throws Exception { - if (!type.equalsIgnoreCase("activemq") && !type.equalsIgnoreCase("webspheremq")) { + if (!type.equalsIgnoreCase("activemq") + && !type.equalsIgnoreCase("artemis") + && !type.equalsIgnoreCase("webspheremq")) { throw new IllegalArgumentException("JMS connection factory type not known"); } - File outFile = getConnectionFactoryFile(name); + Path outFile = getConnectionFactoryFile(name); String template; - HashMap<String, String> properties = new HashMap<String, String>(); + HashMap<String, String> properties = new HashMap<>(); properties.put("name", name); if (type.equalsIgnoreCase("activemq")) { @@ -69,6 +70,12 @@ public class JmsServiceImpl implements JmsService { properties.put("username", username); properties.put("password", password); template = "connectionfactory-activemq.xml"; + } else if (type.equalsIgnoreCase("artemis")) { + // artemis + properties.put("url", url); + properties.put("username", username); + properties.put("password", password); + template = "connectionfactory-artemis.xml"; } else { // webspheremq String[] splitted = url.split("/"); @@ -86,75 +93,66 @@ public class JmsServiceImpl implements JmsService { if (is == null) { throw new IllegalArgumentException("Template resource " + template + " doesn't exist"); } - TemplateUtils.createFromTemplate(outFile, is, properties); + TemplateUtils.createFromTemplate(outFile.toFile(), is, properties); } - private File getConnectionFactoryFile(String name) { - return new File(deployFolder, "connectionfactory-" + name + ".xml"); + private Path getConnectionFactoryFile(String name) { + return deployFolder.resolve("connectionfactory-" + name + ".xml"); } @Override public void delete(String name) throws Exception { - File connectionFactoryFile = getConnectionFactoryFile(name); - if (!connectionFactoryFile.exists()) { - throw new IllegalStateException("The JMS connection factory file " + connectionFactoryFile.getPath() + " doesn't exist"); + Path connectionFactoryFile = getConnectionFactoryFile(name); + if (!Files.isRegularFile(connectionFactoryFile)) { + throw new IllegalStateException("The JMS connection factory file " + connectionFactoryFile + " doesn't exist"); } - connectionFactoryFile.delete(); + Files.delete(connectionFactoryFile); } - @SuppressWarnings("rawtypes") @Override public List<String> connectionFactories() throws Exception { - List<String> connectionFactories = new ArrayList<String>(); - ServiceReference[] references = bundleContext.getServiceReferences(ConnectionFactory.class.getName(), null); - if (references != null) { - for (ServiceReference reference : references) { - if (reference.getProperty("osgi.jndi.service.name") != null) { - connectionFactories.add((String) reference.getProperty("osgi.jndi.service.name")); - } else if (reference.getProperty("name") != null) { - connectionFactories.add((String) reference.getProperty("name")); - } else { - connectionFactories.add(reference.getProperty(Constants.SERVICE_ID).toString()); - } - } + return bundleContext.getServiceReferences(ConnectionFactory.class, null).stream() + .map(this::getConnectionFactoryName) + .distinct() + .collect(Collectors.toList()); + } + + private String getConnectionFactoryName(ServiceReference<ConnectionFactory> reference) { + if (reference.getProperty("osgi.jndi.service.name") != null) { + return (String) reference.getProperty("osgi.jndi.service.name"); + } else if (reference.getProperty("name") != null) { + return (String) reference.getProperty("name"); + } else { + return reference.getProperty(Constants.SERVICE_ID).toString(); } - return connectionFactories; } @Override public List<String> connectionFactoryFileNames() throws Exception { - String[] connectionFactoryFileNames = deployFolder.list(new FilenameFilter() { - - @Override - public boolean accept(File dir, String name) { - return name.startsWith("connectionfactory-") && name.endsWith(".xml"); - } - }); - - return Arrays.asList(connectionFactoryFileNames); + return Files.list(deployFolder) + .map(Path::getFileName) + .map(Path::toString) + .filter(name -> name.startsWith("connectionfactory-") && name.endsWith(".xml")) + .collect(Collectors.toList()); } @Override public Map<String, String> info(String connectionFactory, String username, String password) throws IOException, JMSException { - JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); - try { + try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { ConnectionMetaData metaData = connector.connect().getMetaData(); - Map<String, String> map = new HashMap<String, String>(); + Map<String, String> map = new HashMap<>(); map.put("product", metaData.getJMSProviderName()); map.put("version", metaData.getProviderVersion()); return map; - } finally { - connector.close(); } } - @SuppressWarnings("unchecked") @Override public int count(String connectionFactory, final String destination, String username, String password) throws IOException, JMSException { - JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); - try { + try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { Session session = connector.createSession(); QueueBrowser browser = session.createBrowser(session.createQueue(destination)); + @SuppressWarnings("unchecked") Enumeration<Message> enumeration = browser.getEnumeration(); int count = 0; while (enumeration.hasMoreElements()) { @@ -163,67 +161,57 @@ public class JmsServiceImpl implements JmsService { } browser.close(); return count; - } finally { - connector.close(); } } private DestinationSource getDestinationSource(Connection connection) throws JMSException { - if (connection instanceof PooledConnection) { - connection = ((PooledConnection) connection).getConnection(); + while (true) { + try { + Method mth = connection.getClass().getMethod("getConnection"); + connection = (Connection) mth.invoke(connection); + } catch (Throwable e) { + break; + } } - if (connection instanceof ActiveMQConnection) { - return ((ActiveMQConnection) connection).getDestinationSource(); - } else { - return null; + List<DestinationSource.Factory> factories = Arrays.asList( + new ActiveMQDestinationSourceFactory(), + new ArtemisDestinationSourceFactory() + ); + DestinationSource source = null; + for (DestinationSource.Factory factory : factories) { + source = factory.create(connection); + if (source != null) { + break; + } + } + if (source == null) { + source = d -> Collections.emptyList(); } + return source; } @Override public List<String> queues(String connectionFactory, String username, String password) throws JMSException, IOException { - JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); - try { - List<String> queues = new ArrayList<String>(); - DestinationSource destinationSource = getDestinationSource(connector.connect()); - if (destinationSource != null) { - Set<ActiveMQQueue> activeMQQueues = destinationSource.getQueues(); - for (ActiveMQQueue activeMQQueue : activeMQQueues) { - queues.add(activeMQQueue.getQueueName()); - } - } - return queues; - } finally { - connector.close(); + try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { + return getDestinationSource(connector.connect()).getNames(DestinationSource.DestinationType.Queue); } } @Override public List<String> topics(String connectionFactory, String username, String password) throws IOException, JMSException { - JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); - try { - DestinationSource destinationSource = getDestinationSource(connector.connect()); - List<String> topics = new ArrayList<String>(); - if (destinationSource != null) { - Set<ActiveMQTopic> activeMQTopics = destinationSource.getTopics(); - for (ActiveMQTopic activeMQTopic : activeMQTopics) { - topics.add(activeMQTopic.getTopicName()); - } - } - return topics; - } finally { - connector.close(); + try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { + return getDestinationSource(connector.connect()).getNames(DestinationSource.DestinationType.Topic); } } - @SuppressWarnings("unchecked") @Override public List<JmsMessage> browse(String connectionFactory, final String queue, final String filter, String username, String password) throws JMSException, IOException { - JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); - try { - List<JmsMessage> messages = new ArrayList<JmsMessage>(); + try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { + List<JmsMessage> messages = new ArrayList<>(); Session session = connector.createSession(); QueueBrowser browser = session.createBrowser(session.createQueue(queue), filter); + @SuppressWarnings("unchecked") Enumeration<Message> enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { Message message = enumeration.nextElement(); @@ -232,16 +220,13 @@ public class JmsServiceImpl implements JmsService { } browser.close(); return messages; - } finally { - connector.close(); } } @Override public void send(String connectionFactory, final String queue, final String body, final String replyTo, String username, String password) throws IOException, JMSException { - JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); - try { + try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { Session session = connector.createSession(); Message message = session.createTextMessage(body); if (replyTo != null) { @@ -250,16 +235,13 @@ public class JmsServiceImpl implements JmsService { MessageProducer producer = session.createProducer(session.createQueue(queue)); producer.send(message); producer.close(); - } finally { - connector.close(); } } @Override public int consume(String connectionFactory, final String queue, final String selector, String username, String password) throws Exception { - JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); - try { + try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { int count = 0; Session session = connector.createSession(); MessageConsumer consumer = session.createConsumer(session.createQueue(queue), selector); @@ -271,16 +253,13 @@ public class JmsServiceImpl implements JmsService { } } while (message != null); return count; - } finally { - connector.close(); } } @Override public int move(String connectionFactory, final String sourceQueue, final String targetQueue, final String selector, String username, String password) throws IOException, JMSException { - JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); - try { + try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { int count = 0; Session session = connector.createSession(Session.SESSION_TRANSACTED); MessageConsumer consumer = session.createConsumer(session.createQueue(sourceQueue), selector); @@ -296,8 +275,6 @@ public class JmsServiceImpl implements JmsService { session.commit(); consumer.close(); return count; - } finally { - connector.close(); } } http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/java/org/apache/karaf/jms/internal/JsonReader.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JsonReader.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JsonReader.java new file mode 100644 index 0000000..6c08a19 --- /dev/null +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/JsonReader.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + */ +public final class JsonReader { + + // + // Implementation + // + + private final Reader reader; + private final StringBuilder recorder; + private int current; + private int line = 1; + private int column; + + private JsonReader(Reader reader) { + this.reader = reader; + recorder = new StringBuilder(); + } + + public static Object read(Reader reader) throws IOException { + return new JsonReader(reader).parse(); + } + + public static Object read(InputStream is) throws IOException { + return new JsonReader(new InputStreamReader(is)).parse(); + } + + private Object parse() throws IOException { + read(); + skipWhiteSpace(); + Object result = readValue(); + skipWhiteSpace(); + if (!endOfText()) { + throw error("Unexpected character"); + } + return result; + } + + private Object readValue() throws IOException { + switch (current) { + case 'n': + return readNull(); + case 't': + return readTrue(); + case 'f': + return readFalse(); + case '"': + return readString(); + case '[': + return readArray(); + case '{': + return readObject(); + case '-': + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + return readNumber(); + default: + throw expected("value"); + } + } + + private Collection<?> readArray() throws IOException { + read(); + Collection<Object> array = new ArrayList<>(); + skipWhiteSpace(); + if (readChar(']')) { + return array; + } + do { + skipWhiteSpace(); + array.add(readValue()); + skipWhiteSpace(); + } while (readChar(',')); + if (!readChar(']')) { + throw expected("',' or ']'"); + } + return array; + } + + private Map<String, Object> readObject() throws IOException { + read(); + Map<String, Object> object = new HashMap<>(); + skipWhiteSpace(); + if (readChar('}')) { + return object; + } + do { + skipWhiteSpace(); + String name = readName(); + skipWhiteSpace(); + if (!readChar(':')) { + throw expected("':'"); + } + skipWhiteSpace(); + object.put(name, readValue()); + skipWhiteSpace(); + } while (readChar(',')); + if (!readChar('}')) { + throw expected("',' or '}'"); + } + return object; + } + + private Object readNull() throws IOException { + read(); + readRequiredChar('u'); + readRequiredChar('l'); + readRequiredChar('l'); + return null; + } + + private Boolean readTrue() throws IOException { + read(); + readRequiredChar('r'); + readRequiredChar('u'); + readRequiredChar('e'); + return Boolean.TRUE; + } + + private Boolean readFalse() throws IOException { + read(); + readRequiredChar('a'); + readRequiredChar('l'); + readRequiredChar('s'); + readRequiredChar('e'); + return Boolean.FALSE; + } + + private void readRequiredChar(char ch) throws IOException { + if (!readChar(ch)) { + throw expected("'" + ch + "'"); + } + } + + private String readString() throws IOException { + read(); + recorder.setLength(0); + while (current != '"') { + if (current == '\\') { + readEscape(); + } else if (current < 0x20) { + throw expected("valid string character"); + } else { + recorder.append((char) current); + read(); + } + } + read(); + return recorder.toString(); + } + + private void readEscape() throws IOException { + read(); + switch (current) { + case '"': + case '/': + case '\\': + recorder.append((char) current); + break; + case 'b': + recorder.append('\b'); + break; + case 'f': + recorder.append('\f'); + break; + case 'n': + recorder.append('\n'); + break; + case 'r': + recorder.append('\r'); + break; + case 't': + recorder.append('\t'); + break; + case 'u': + char[] hexChars = new char[4]; + for (int i = 0; i < 4; i++) { + read(); + if (!isHexDigit(current)) { + throw expected("hexadecimal digit"); + } + hexChars[i] = (char) current; + } + recorder.append((char) Integer.parseInt(String.valueOf(hexChars), 16)); + break; + default: + throw expected("valid escape sequence"); + } + read(); + } + + private Number readNumber() throws IOException { + recorder.setLength(0); + readAndAppendChar('-'); + int firstDigit = current; + if (!readAndAppendDigit()) { + throw expected("digit"); + } + if (firstDigit != '0') { + while (readAndAppendDigit()) { + // Do nothing + } + } + readFraction(); + readExponent(); + return Double.parseDouble(recorder.toString()); + } + + private boolean readFraction() throws IOException { + if (!readAndAppendChar('.')) { + return false; + } + if (!readAndAppendDigit()) { + throw expected("digit"); + } + while (readAndAppendDigit()) { + // Do nothing + } + return true; + } + + private boolean readExponent() throws IOException { + if (!readAndAppendChar('e') && !readAndAppendChar('E')) { + return false; + } + if (!readAndAppendChar('+')) { + readAndAppendChar('-'); + } + if (!readAndAppendDigit()) { + throw expected("digit"); + } + while (readAndAppendDigit()) { + // Do nothing + } + return true; + } + + private String readName() throws IOException { + if (current != '"') { + throw expected("name"); + } + readString(); + return recorder.toString(); + } + + private boolean readAndAppendChar(char ch) throws IOException { + if (current != ch) { + return false; + } + recorder.append(ch); + read(); + return true; + } + + private boolean readChar(char ch) throws IOException { + if (current != ch) { + return false; + } + read(); + return true; + } + + private boolean readAndAppendDigit() throws IOException { + if (!isDigit(current)) { + return false; + } + recorder.append((char) current); + read(); + return true; + } + + private void skipWhiteSpace() throws IOException { + while (isWhiteSpace(current) && !endOfText()) { + read(); + } + } + + private void read() throws IOException { + if (endOfText()) { + throw error("Unexpected end of input"); + } + column++; + if (current == '\n') { + line++; + column = 0; + } + current = reader.read(); + } + + private boolean endOfText() { + return current == -1; + } + + private IOException expected(String expected) { + if (endOfText()) { + return error("Unexpected end of input"); + } + return error("Expected " + expected); + } + + private IOException error(String message) { + return new IOException(message + " at " + line + ":" + column); + } + + private static boolean isWhiteSpace(int ch) { + return ch == ' ' || ch == '\t' || ch == '\n' || ch == '\r'; + } + + private static boolean isDigit(int ch) { + return ch >= '0' && ch <= '9'; + } + + private static boolean isHexDigit(int ch) { + return ch >= '0' && ch <= '9' || ch >= 'a' && ch <= 'f' || ch >= 'A' && ch <= 'F'; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java new file mode 100644 index 0000000..863a092 --- /dev/null +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java @@ -0,0 +1,33 @@ +package org.apache.karaf.jms.internal.osgi; + +import org.apache.karaf.jms.JmsService; +import org.apache.karaf.jms.internal.JmsMBeanImpl; +import org.apache.karaf.jms.internal.JmsServiceImpl; +import org.apache.karaf.shell.api.console.CommandLoggingFilter; +import org.apache.karaf.shell.support.RegexCommandLoggingFilter; +import org.apache.karaf.util.tracker.BaseActivator; +import org.apache.karaf.util.tracker.annotation.ProvideService; +import org.apache.karaf.util.tracker.annotation.RequireService; +import org.apache.karaf.util.tracker.annotation.Services; + +@Services( + provides = @ProvideService(JmsService.class) +) +public class Activator extends BaseActivator { + @Override + protected void doStart() throws Exception { + JmsServiceImpl service = new JmsServiceImpl(); + service.setBundleContext(bundleContext); + register(JmsService.class, service); + + JmsMBeanImpl mbean = new JmsMBeanImpl(); + mbean.setJmsService(service); + registerMBean(mbean, "type=jms"); + + RegexCommandLoggingFilter filter = new RegexCommandLoggingFilter(); + filter.addRegEx("create +.*?--password ([^ ]+)", 2); + filter.addRegEx("create +.*?-p ([^ ]+)", 2); + register(CommandLoggingFilter.class, filter); + + } +} http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/resources/OSGI-INF/blueprint/jms-core.xml ---------------------------------------------------------------------- diff --git a/jms/core/src/main/resources/OSGI-INF/blueprint/jms-core.xml b/jms/core/src/main/resources/OSGI-INF/blueprint/jms-core.xml deleted file mode 100644 index c011b6a..0000000 --- a/jms/core/src/main/resources/OSGI-INF/blueprint/jms-core.xml +++ /dev/null @@ -1,55 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version - 2.0 (the "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 Unless required by - applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the - License. - --> -<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" - xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0" - default-activation="lazy"> - - <ext:property-placeholder placeholder-prefix="$[" placeholder-suffix="]"/> - - <bean id="jmsService" class="org.apache.karaf.jms.internal.JmsServiceImpl"> - <property name="bundleContext" ref="blueprintBundleContext"/> - </bean> - - <service ref="jmsService" interface="org.apache.karaf.jms.JmsService" /> - - <!-- Management --> - <bean id="jmsMBeanImpl" class="org.apache.karaf.jms.internal.JmsMBeanImpl"> - <property name="jmsService" ref="jmsService"/> - </bean> - - <service ref="jmsMBeanImpl" auto-export="interfaces"> - <service-properties> - <entry key="jmx.objectname" value="org.apache.karaf:type=jms,name=$[karaf.name]"/> - </service-properties> - </service> - - <!-- Lets try to filter passwords out of the logs --> - <service auto-export="interfaces"> - <bean class="org.apache.karaf.shell.support.RegexCommandLoggingFilter"> - <property name="pattern" value="create +.*?--password ([^ ]+)"/> - <property name="group" value="2"/> - </bean> - </service> - <service auto-export="interfaces"> - <bean class="org.apache.karaf.shell.support.RegexCommandLoggingFilter"> - <property name="pattern" value="create +.*?-p ([^ ]+)"/> - <property name="group" value="2"/> - </bean> - </service> - -</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml ---------------------------------------------------------------------- diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml index d770734..da2ad1a 100644 --- a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml +++ b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml @@ -17,22 +17,18 @@ --> <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> - <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> - <property name="brokerURL" value="${url}" /> - <property name="userName" value="${username}" /> - <property name="password" value="${password}" /> - </bean> - - <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> - <property name="maxConnections" value="8" /> - <property name="connectionFactory" ref="connectionFactory" /> - </bean> - - <service ref="pooledConnectionFactory" interface="javax.jms.ConnectionFactory"> + <service interface="javax.jms.ConnectionFactory"> <service-properties> <entry key="name" value="${name}" /> <entry key="osgi.jndi.service.name" value="jms/${name}" /> + <entry key="karaf.jms.wrap" value="true" /> + <entry key="karaf.jms.pool.maxConnections" value="8" /> </service-properties> + <bean class="org.apache.activemq.ActiveMQConnectionFactory"> + <property name="brokerURL" value="${url}" /> + <property name="userName" value="${username}" /> + <property name="password" value="${password}" /> + </bean> </service> </blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml ---------------------------------------------------------------------- diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml new file mode 100644 index 0000000..67b1f54 --- /dev/null +++ b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version + 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 Unless required by + applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the + License. + --> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> + + <service interface="javax.jms.ConnectionFactory"> + <service-properties> + <entry key="name" value="${name}" /> + <entry key="osgi.jndi.service.name" value="jms/${name}" /> + <entry key="karaf.jms.wrap" value="true" /> + <entry key="karaf.jms.pool.maxConnections" value="8" /> + </service-properties> + <bean class="org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory"> + <argument value="${url}" /> + <argument value="${username}" /> + <argument value="${password}" /> + <property name="producerWindowSize" value="-1" /> + </bean> + </service> + +</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml ---------------------------------------------------------------------- diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml index 3123f49..999c85b 100644 --- a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml +++ b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml @@ -17,20 +17,19 @@ --> <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> - <bean id="wmqConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory"> - <property name="transportType" value="1" /> - <property name="hostName" value="${hostname}" /> - <property name="port" value="${port}" /> - <property name="queueManager" value="${queuemanager}" /> - <property name="channel" value="${channel}" /> - <property name="useConnectionPooling" value="true" /> - </bean> - - <service ref="wmqConnectionFactory" interface="javax.jms.ConnectionFactory"> + <service interface="javax.jms.ConnectionFactory"> <service-properties> <entry key="name" value="${name}"/> <entry key="osgi.jndi.service.name" value="jms/${name}"/> + <entry key="karaf.jms.wrap" value="true" /> </service-properties> + <bean class="com.ibm.mq.jms.MQQueueConnectionFactory"> + <property name="transportType" value="1" /> + <property name="hostName" value="${hostname}" /> + <property name="port" value="${port}" /> + <property name="queueManager" value="${queuemanager}" /> + <property name="channel" value="${channel}" /> + </bean> </service> </blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/pom.xml ---------------------------------------------------------------------- diff --git a/jms/pom.xml b/jms/pom.xml index 231b40f..3b3b185 100644 --- a/jms/pom.xml +++ b/jms/pom.xml @@ -34,6 +34,7 @@ <name>Apache Karaf :: Features</name> <modules> - <module>core</module> + <module>core</module> + <module>pool</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/pool/pom.xml ---------------------------------------------------------------------- diff --git a/jms/pool/pom.xml b/jms/pool/pom.xml new file mode 100644 index 0000000..21278d2 --- /dev/null +++ b/jms/pool/pom.xml @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.karaf</groupId> + <artifactId>karaf</artifactId> + <version>4.2.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.karaf.jms</groupId> + <artifactId>org.apache.karaf.jms.pool</artifactId> + <packaging>bundle</packaging> + <name>Apache Karaf :: JMS :: Pool</name> + <description>This bundle provides pooling implementation of the JMS service.</description> + + <properties> + <appendedResourcesDirectory>${basedir}/../../etc/appended-resources</appendedResourcesDirectory> + </properties> + + <dependencies> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-pool2</artifactId> + <version>2.4.2</version> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>${project.basedir}/src/main/resources</directory> + <includes> + <include>**/*</include> + </includes> + </resource> + <resource> + <directory>${project.basedir}/src/main/resources</directory> + <filtering>true</filtering> + <includes> + <include>**/*.info</include> + </includes> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <configuration> + <instructions> + <Export-Package> + </Export-Package> + <Import-Package> + javax.jms;version="[1.1,3)", + * + </Import-Package> + <Private-Package> + org.apache.karaf.jms.pool.internal, + org.apache.karaf.jms.pool.internal.osgi + </Private-Package> + <Bundle-Activator> + org.apache.karaf.jms.pool.internal.osgi.Activator + </Bundle-Activator> + </instructions> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java new file mode 100644 index 0000000..9dab2fc --- /dev/null +++ b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.pool.internal; + +/** + * A cache key for the connection details + * + * + */ +public class ConnectionKey { + private String userName; + private String password; + private int hash; + + public ConnectionKey(String userName, String password) { + this.password = password; + this.userName = userName; + hash = 31; + if (userName != null) { + hash += userName.hashCode(); + } + hash *= 31; + if (password != null) { + hash += password.hashCode(); + } + } + + public int hashCode() { + return hash; + } + + public boolean equals(Object that) { + if (this == that) { + return true; + } + if (that instanceof ConnectionKey) { + return equals((ConnectionKey)that); + } + return false; + } + + public boolean equals(ConnectionKey that) { + return isEqual(this.userName, that.userName) && isEqual(this.password, that.password); + } + + public String getPassword() { + return password; + } + + public String getUserName() { + return userName; + } + + public static boolean isEqual(Object o1, Object o2) { + if (o1 == o2) { + return true; + } + return o1 != null && o2 != null && o1.equals(o2); + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java new file mode 100644 index 0000000..fbf0384 --- /dev/null +++ b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.pool.internal; + +import org.apache.commons.pool2.KeyedPooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; + +import javax.jms.Connection; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Session; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Holds a real JMS connection along with the session pools associated with it. + * <p/> + * Instances of this class are shared amongst one or more PooledConnection object and must + * track the session objects that are loaned out for cleanup on close as well as ensuring + * that the temporary destinations of the managed Connection are purged when all references + * to this ConnectionPool are released. + */ +public class ConnectionPool { + protected Connection connection; + private int referenceCount; + private long lastUsed = System.currentTimeMillis(); + private final long firstUsed = lastUsed; + private boolean hasExpired; + private int idleTimeout = 30 * 1000; + private long expiryTimeout = 0l; + private boolean useAnonymousProducers = true; + + private final AtomicBoolean started = new AtomicBoolean(false); + private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool; + private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<>(); + + public ConnectionPool(Connection connection) { + + this.connection = connection; + + // Create our internal Pool of session instances. + this.sessionPool = new GenericKeyedObjectPool<>( + new KeyedPooledObjectFactory<SessionKey, PooledSession>() { + + @Override + public void activateObject(SessionKey key, PooledObject<PooledSession> session) throws Exception { + ConnectionPool.this.loanedSessions.add(session.getObject()); + } + + @Override + public void destroyObject(SessionKey key, PooledObject<PooledSession> session) throws Exception { + ConnectionPool.this.loanedSessions.remove(session.getObject()); + session.getObject().getInternalSession().close(); + } + + @Override + public PooledObject<PooledSession> makeObject(SessionKey key) throws Exception { + Session session = makeSession(key); + return new DefaultPooledObject<>(new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers)); + } + + @Override + public void passivateObject(SessionKey key, PooledObject<PooledSession> session) throws Exception { + ConnectionPool.this.loanedSessions.remove(session.getObject()); + } + + @Override + public boolean validateObject(SessionKey key, PooledObject<PooledSession> session) { + return true; + } + } + ); + } + + // useful when external failure needs to force expiry + public void setHasExpired(boolean val) { + hasExpired = val; + } + + protected Session makeSession(SessionKey key) throws JMSException { + return connection.createSession(key.isTransacted(), key.getAckMode()); + } + + public void start() throws JMSException { + if (started.compareAndSet(false, true)) { + try { + connection.start(); + } catch (JMSException e) { + started.set(false); + throw(e); + } + } + } + + public synchronized Connection getConnection() { + return connection; + } + + public Session createSession(boolean transacted, int ackMode) throws JMSException { + SessionKey key = new SessionKey(transacted, ackMode); + PooledSession session; + try { + session = sessionPool.borrowObject(key); + } catch (Exception e) { + IllegalStateException illegalStateException = new IllegalStateException(e.toString()); + illegalStateException.initCause(e); + throw illegalStateException; + } + return session; + } + + public synchronized void close() { + if (connection != null) { + try { + sessionPool.close(); + } catch (Exception e) { + } finally { + try { + connection.close(); + } catch (Exception e) { + } finally { + connection = null; + } + } + } + } + + public synchronized void incrementReferenceCount() { + referenceCount++; + lastUsed = System.currentTimeMillis(); + } + + public synchronized void decrementReferenceCount() { + referenceCount--; + lastUsed = System.currentTimeMillis(); + if (referenceCount == 0) { + // Loaned sessions are those that are active in the sessionPool and + // have not been closed by the client before closing the connection. + // These need to be closed so that all session's reflect the fact + // that the parent Connection is closed. + for (PooledSession session : this.loanedSessions) { + try { + session.close(); + } catch (Exception e) { + } + } + this.loanedSessions.clear(); + + expiredCheck(); + } + } + + /** + * Determines if this Connection has expired. + * <p/> + * A ConnectionPool is considered expired when all references to it are released AND either + * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed. + * Once a ConnectionPool is determined to have expired its underlying Connection is closed. + * + * @return true if this connection has expired. + */ + public synchronized boolean expiredCheck() { + + boolean expired = false; + + if (connection == null) { + return true; + } + + if (hasExpired) { + if (referenceCount == 0) { + close(); + expired = true; + } + } + + if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) { + hasExpired = true; + if (referenceCount == 0) { + close(); + expired = true; + } + } + + // Only set hasExpired here is no references, as a Connection with references is by + // definition not idle at this time. + if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) { + hasExpired = true; + close(); + expired = true; + } + + return expired; + } + + public int getIdleTimeout() { + return idleTimeout; + } + + public void setIdleTimeout(int idleTimeout) { + this.idleTimeout = idleTimeout; + } + + public void setExpiryTimeout(long expiryTimeout) { + this.expiryTimeout = expiryTimeout; + } + + public long getExpiryTimeout() { + return expiryTimeout; + } + + public int getMaximumActiveSessionPerConnection() { + return this.sessionPool.getMaxTotalPerKey(); + } + + public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) { + this.sessionPool.setMaxTotalPerKey(maximumActiveSessionPerConnection); + } + + public boolean isUseAnonymousProducers() { + return this.useAnonymousProducers; + } + + public void setUseAnonymousProducers(boolean value) { + this.useAnonymousProducers = value; + } + + /** + * @return the total number of Pooled session including idle sessions that are not + * currently loaned out to any client. + */ + public int getNumSessions() { + return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive(); + } + + /** + * @return the total number of Sessions that are in the Session pool but not loaned out. + */ + public int getNumIdleSessions() { + return this.sessionPool.getNumIdle(); + } + + /** + * @return the total number of Session's that have been loaned to PooledConnection instances. + */ + public int getNumActiveSessions() { + return this.sessionPool.getNumActive(); + } + + /** + * Configure whether the createSession method should block when there are no more idle sessions and the + * pool already contains the maximum number of active sessions. If false the create method will fail + * and throw an exception. + * + * @param block + * Indicates whether blocking should be used to wait for more space to create a session. + */ + public void setBlockIfSessionPoolIsFull(boolean block) { + this.sessionPool.setBlockWhenExhausted(block); + } + + public boolean isBlockIfSessionPoolIsFull() { + return this.sessionPool.getBlockWhenExhausted(); + } + + /** + * Returns the timeout to use for blocking creating new sessions + * + * @return true if the pooled Connection createSession method will block when the limit is hit. + * @see #setBlockIfSessionPoolIsFull(boolean) + */ + public long getBlockIfSessionPoolIsFullTimeout() { + return this.sessionPool.getMaxWaitMillis(); + } + + /** + * Controls the behavior of the internal session pool. By default the call to + * Connection.getSession() will block if the session pool is full. This setting + * will affect how long it blocks and throws an exception after the timeout. + * + * The size of the session pool is controlled by the @see #maximumActive + * property. + * + * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull + * property + * + * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, + * then use this setting to configure how long to block before retry + */ + public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { + this.sessionPool.setMaxWaitMillis(blockIfSessionPoolIsFullTimeout); + } + + @Override + public String toString() { + return "ConnectionPool[" + connection + "]"; + } +} http://git-wip-us.apache.org/repos/asf/karaf/blob/dba4b37c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java new file mode 100755 index 0000000..c5900d1 --- /dev/null +++ b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.pool.internal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLServerSocket; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +public final class IntrospectionSupport { + + private static final Logger LOG = LoggerFactory.getLogger(IntrospectionSupport.class); + + private IntrospectionSupport() { + } + + @SuppressWarnings("rawtypes") + public static boolean setProperties(Object target, Map props) { + boolean rc = false; + + if (target == null) { + throw new IllegalArgumentException("target was null."); + } + if (props == null) { + throw new IllegalArgumentException("props was null."); + } + + for (Iterator<?> iter = props.entrySet().iterator(); iter.hasNext();) { + Entry<?,?> entry = (Entry<?,?>)iter.next(); + if (setProperty(target, (String)entry.getKey(), entry.getValue())) { + iter.remove(); + rc = true; + } + } + + return rc; + } + + public static boolean setProperty(Object target, String name, Object value) { + try { + Class<?> clazz = target.getClass(); + if (target instanceof SSLServerSocket) { + // overcome illegal access issues with internal implementation class + clazz = SSLServerSocket.class; + } + Method setter = findSetterMethod(clazz, name); + if (setter == null) { + return false; + } + + // If the type is null or it matches the needed type, just use the + // value directly + if (value == null || value.getClass() == setter.getParameterTypes()[0]) { + setter.invoke(target, value); + } else { + // We need to convert it + setter.invoke(target, convert(value, setter.getParameterTypes()[0])); + } + return true; + } catch (Exception e) { + LOG.error(String.format("Could not set property %s on %s", name, target), e); + return false; + } + } + + @SuppressWarnings({ + "rawtypes", "unchecked" + }) + private static Object convert(Object value, Class to) { + if (value == null) { + // lets avoid NullPointerException when converting to boolean for null values + if (boolean.class.isAssignableFrom(to)) { + return Boolean.FALSE; + } + return null; + } + + // eager same instance type test to avoid the overhead of invoking the type converter + // if already same type + if (to.isAssignableFrom(value.getClass())) { + return to.cast(value); + } + + if (boolean.class.isAssignableFrom(to) && value instanceof String) { + return Boolean.valueOf((String) value); + } + + throw new IllegalArgumentException("Cannot convert from " + value.getClass() + + " to " + to + " with value " + value); + } + + private static Method findSetterMethod(Class<?> clazz, String name) { + // Build the method name. + name = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1); + Method[] methods = clazz.getMethods(); + for (Method method : methods) { + Class<?> params[] = method.getParameterTypes(); + if (method.getName().equals(name) && params.length == 1 ) { + return method; + } + } + return null; + } + +}