This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git

commit 0d2ab666035ea47563339bfce27ed914ae8c2a16
Author: Ian Boston <i...@apache.org>
AuthorDate: Wed Jul 27 12:10:12 2016 +0000

    SLING-5646 MoM API and JMS implementation with example usage by Jobs 
implementation.
    Squashes 27 commits from https://github.com/ieb/sling/tree/jobs_28 as 
follows.
    Added first stab at a message oriented job subsystem
    Added basic implementation for the manager keeping queue implementation 
details abstracted
    Added ActiveMQ implementation for queues and topics, and fixed a number of 
the the SPI interfaces in the process
    Basic Test coverate for OOT JMS Broker
    Extracted a MOM API with no Jobs or JMS references, test coverage for 
ActiveMQ impl is 100% class, 93% method, 75% line
    Added missing license headers, documentation and cleaned out unused  
interfaces
    Fixed JMS Transaction issue found by @tmaret
    Coverage for the majority of the jobs code is complete
    Basic unit test coverage complete, core has 94% by lines, AMQ 74%, 100% 
classes and methods
    Added testing environment for a runnign server, Not working yet
    Added ability for detect when the OSGi container has completed bundle 
startup without having to perform http requests
    Start at IT testing with Crankstart
    Fixed issues with shutdown inside a Crankstart container
    Working Crankstart IT framework
    Version that uses Q->Jobs->JobConsumer pattern
    Added a Queue Factory to allow configuration of multiple queues between the 
MOM API and Job Subsystem and move JobConsumers to register with a Job type
    Migrated Subscribers and QueueReaders to a OSGi whiteboard pattern after 
discussion on Sling Dev
    Changes the JobConsumer to use a Callback rather than return a job. This 
was suggested offlist by others in Adobe as a way of improving resource 
consumption
    Added Types to improve type safety in certain areas after suggestions 
offlist
    Fixed issue with OSGi startup in IntelliJ caused by version 4 of the
    Felix framework bundle being present inside the maven pom. Strangely a
    command line build was not impacted.
    Added integration test bundle to test service. Adjusted some of the APIs to 
make using the Job Sub System easier
    Integration tests now starting jobs from messages
    Fixed Startup to work in real Sling/AEM container. The Active MQ OSGi 
bundle contains additional dependencies that cause all sorts of problems, AMQ 
is now being embedded into the AMQ MOM Impl bundle.
    Fixed Queue expriy bug and added AEM Fiddle to run jobs
    Added Documentation for configuration and default sample configuration
    Added Explicit requeue mechanims rather than relying on AMQ's requeue 
capabilities
    Moved MoM to new Home
    
    git-svn-id: 
https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1754255 
13f79535-47bb-0310-9956-ffa450edef68
---
 .gitignore                                         |   1 +
 README.md                                          |  79 ++++++
 pom.xml                                            | 237 ++++++++++++++++
 .../amq/ActiveMQConnectionFactoryService.java      | 106 ++++++++
 .../apache/sling/jms/ConnectionFactoryService.java |  36 +++
 .../java/org/apache/sling/jms/JMSMessageTypes.java |  29 ++
 .../java/org/apache/sling/jms/JMSQueueManager.java | 289 ++++++++++++++++++++
 .../java/org/apache/sling/jms/JMSTopicManager.java | 248 +++++++++++++++++
 src/main/java/org/apache/sling/jms/Json.java       |  92 +++++++
 .../resources/org/apache/sling/amq/activemq.xml    | 170 ++++++++++++
 .../org/apache/sling/amq/credentials.properties    |  22 ++
 src/main/resources/org/apache/sling/amq/jetty.xml  | 144 ++++++++++
 .../amq/ActiveMQConnectionFactoryServiceTest.java  |  81 ++++++
 .../org/apache/sling/jms/JMSQueueManagerTest.java  | 301 +++++++++++++++++++++
 .../org/apache/sling/jms/JMSTopicManagerTest.java  | 235 ++++++++++++++++
 src/test/java/org/apache/sling/jms/JsonTest.java   | 118 ++++++++
 16 files changed, 2188 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0d18955
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+activemq-data
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..d95f6d2
--- /dev/null
+++ b/README.md
@@ -0,0 +1,79 @@
+# Message Oriented Middleware API implementation using Active MQ.
+
+This bundle implements the MoM API using Active MQ. It supports both Pub/Sub 
and Queue patterns and will run out of the
+box, embedded or connected to a dedicated cluster.
+
+## Out of the Box.
+
+As the name suggests, no configuration is required, the components will start, 
create an ActiveMQ instance embedded inside
+the OSGi container and run. On restart, normal or after a crash, provided the 
working directory is not changed or modified, 
+the ActiveMQ embedded server will restart and resume operations. In the event 
the JVM crashes, ActiveMQ will perform recovery
+ by replaying its journal. The embedded server uses KahaDB to store data on 
local disk.
+ 
+## Embedded with custom configuration.
+
+AMQ can be run embedded with custom configuration to allow a cluster of Sling 
instances for form a multi master AMQ cluster with each 
+Sling instance embedding its own AMQ broker. This is achieved via OSGi 
configuration. (ToDo: Config + Doc)
+
+## External AMQ Broker cluster.
+
+The bundle can be run to use an external AMQ Broker cluster, maintained and 
setup separately from the Sling cluster. To do this, modify the
+Broker URL via OSGi configuration.
+
+# Implementation details.
+
+## AMQ ConnectionFactory.
+
+Running AMQ inside OSGi is very simple. All that is required is the AMQ 
dependencies, and instancing an AQM PooledConnectionFactory with a
+default localhost url. vm://localhost:61616. The PooledConnectionFactory will 
trigger the creation of an AMQ Broker if one is not present
+and AMQ will run normally. This service implements an internal service API 
org.apache.sling.jms.ConnectionFactoryService, which enables 
+consumers to get a JMS ConnectionFactory.
+
+## MoM API implementation
+
+The MoM API implementation uses the ConnectionFactory service to get a JMS 
connection. It opens JMS sessions using that connection factory
+and implements the methods in the API. JMS Support both Pub/Sub and Queue 
patterns in the MoM API without much additional work. 
+The JMS sessions are single threaded, so care is taken not to share between 
threads or cause throughput issues with synchronization.
+
+The Map of Map messages in the MoM API are serialised to Json using the Gson 
library and transmitted as Text messages. JMS Headers are currently
+not used other than to identify the JSON encoding of the text payload.
+
+The delivery of messages on Topics and Queues is entirely managed by AMQ with 
no additional code. The retry semantics of the QueueReader API
+is achieved by dispatching JMS messages from within a JMS MessageListener 
onMessage method, and throwing an IllegalArgumentException to JMS
+when a message needs to be re-queued. How retries work and the backoff 
algorithm used for messages that need to be retried is managed 
+by ActiveMQ configuration which supports many scenarios for retrying messages.
+
+## Delivery Retry for Queued messages
+
+The semantics of the MoM API is that a consumer may throw an exception when 
its QueueReader.onMessage method is called. That indicates that the 
+message could not be consumed at this time and should be retried. There are 
several ways that this can be achieved in general, and some 
+AMQ specific ways. By default JMS ensures delivery order. Hence a message on 
the queue that is not dequeued, will block other messages on the queue 
+until it is dequeued. AMQ deals with this by allowing a deployyer to configure 
queues to retry at the broker rather than attempting to redeliver in 
+order to the same JMS consumer. The configuration is not default and has to be 
provided by configuring the AMQ broker.
+
+    <broker xmlns="http://activemq.apache.org/schema/core";    
schedulerSupport="true" >
+        .... 
+        <plugins>
+            <redeliveryPlugin fallbackToDeadLetter="true" 
sendToDlqIfMaxRetriesExceeded="true">
+                <redeliveryPolicyMap>
+                    <redeliveryPolicyMap>
+                        <redeliveryPolicyEntries>
+                            <!-- a destination specific policy -->
+                            <redeliveryPolicy queue="SpecialQueue" 
maximumRedeliveries="4" redeliveryDelay="10000" />
+                        </redeliveryPolicyEntries>
+                        <!-- the fallback policy for all other destinations -->
+                        <defaultEntry>
+                            <redeliveryPolicy maximumRedeliveries="4" 
initialRedeliveryDelay="5000" redeliveryDelay="10000" />
+                        </defaultEntry>
+                    </redeliveryPolicyMap>
+                </redeliveryPolicyMap>
+            </redeliveryPlugin>
+        </plugins>
+        
+This can also be achieved in code, by dequeing all messages regardless of 
failiure or not. Those generate an exception on dequeued get requeued. If the 
size of the 
+queue is so large as to significantly impact processing due to delays in 
processing the queue, then an alternative approach is to requeue to a special 
retry queue, ensuring
+that retries get a higher level of priority. This may not be necessary, as 
retries happen due to unavailability, and if the queue is long, then resources 
will be
+available, so no retry. If the queue is short, then the re-queue time is 
minimal. The approach is quite simular to the approach used in AMQ 5.7 and 
later, although
+it will work with any JMS provider.
+
+The code base is currently configured to use and explicit dequeue and requeue 
approach that does not depend on features of the JMS provider.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..95a7a87
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,237 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.sling</groupId>
+        <artifactId>sling</artifactId>
+        <version>26</version>
+        <relativePath />
+    </parent>
+
+    <artifactId>org.apache.sling.jms</artifactId>
+    <packaging>bundle</packaging>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>Apache Sling MoM Implementation using JMS with AMQ</name>
+    <description>
+        Provides a JMS Connection provider that works OOTB in a cluster or 
uses a pre-existing AMQ endpoint depending on the configuration.
+    </description>
+
+    <scm>
+        
<connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms</connection>
+        
<developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms</developerConnection>
+        
<url>http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms</url>
+    </scm>
+
+    <properties>
+        <site.jira.version.id>12315369</site.jira.version.id>
+        <sling.java.version>7</sling.java.version>
+        <exam.version>4.4.0</exam.version>
+        <url.version>2.4.5</url.version>
+        <bundle.build.dir>${basedir}/target</bundle.build.dir>
+        
<bundle.file.name>${bundle.build.dir}/${project.build.finalName}.jar</bundle.file.name>
+        <min.port>37000</min.port>
+        <max.port>37999</max.port>
+    </properties>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Embed-Dependency>
+                            activemq-broker,
+                            activemq-client,
+                            activemq-pool,
+                            activemq-protobuf,
+                            activemq-kahadb-store,
+                            activemq-jms-pool,
+                            hawtbuf</Embed-Dependency>
+                        <Import-Package>
+                            org.apache.activemq.jaas*;resolution:=optional,
+                            org.apache.commons.net.ftp*;resolution:=optional,
+                            
org.apache.geronimo.transaction*;resolution:=optional,
+                            org.fusesource.hawtbuf*;resolution:=optional,
+                            org.apache.commons.pool2*;resolution:=optional,
+                            javax.jmdns*;resolution:=optional,
+                            !org.apache.maven*,
+                            *
+
+                        </Import-Package>
+                        <!--
+                        
<Import-Package>com.thoughtworks.xstream*;resolution:=optional,
+                            org.apache.activeio*;resolution:=optional,
+                            org.apache.commons.pool*;resolution:=optional,
+                            org.apache.derby*;resolution:=optional,
+                            org.apache.tools.ant*;resolution:=optional,
+                            org.apache.maven*;resolution:=optional,
+                            org.apache.xbean*;resolution:=optional,
+                            
'=org.apache.xbean.spring.context.v2;resolution:=optional',
+                            org.apache.xpath*;resolution:=optional,
+                            org.codehaus.jam*;resolution:=optional,
+                            org.springframework*;resolution:=optional,
+                            
org.springframework.beans.factory.xml;resolution:=optional,
+                            org.w3c.dom.traversal*;resolution:=optional,
+                            org.apache.commons.net*;resolution:=optional,
+                            org.apache.kahadb*;resolution:=optional,
+                            org.apache.activemq.ra*;resolution:=optional,
+                            
org.apache.geronimo.transaction.manager*;resolution:=optional,
+                            *
+                        </Import-Package>
+                        -->
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>activemq-data/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>
+                    </excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.mom</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- ActiveMQ -->
+        <!-- The released OSGi bindle contains a bleuprint config with an 
invalid import that cant be satisfied without a custom
+         bundle, hence these will be embeded. -->
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-client</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-pool</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-jms-pool</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-kahadb-store</artifactId>
+            <version>5.13.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq.protobuf</groupId>
+            <artifactId>activemq-protobuf</artifactId>
+            <version>1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.fusesource.hawtbuf</groupId>
+            <artifactId>hawtbuf</artifactId>
+            <version>1.11</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.2.4</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.core</artifactId>
+            <version>6.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.api</artifactId>
+            <version>2.4.0</version>
+            <scope>provided</scope>
+        </dependency>
+      <!-- Testing -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java 
b/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java
new file mode 100644
index 0000000..bb3798e
--- /dev/null
+++ b/src/main/java/org/apache/sling/amq/ActiveMQConnectionFactoryService.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.amq;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.jms.ConnectionFactoryService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+
+/**
+ * Creates a ConnectionFactoryService that makes a pooled  JMS 
ConnectionFactory available to consumers. The implementation
+ * of JMS is provided by ActiveMQ in this instances. If the component is left 
un-configured it will connection to vm://localhost:6161.
+ * If no server is present at that address, the component will create a 
standalone ActiveMQ broker on startup. Without additional
+ * configuration that AMQ Broker will operate standalone. With configuration 
it is possible to configure the broker to become
+ * a member of a multi master AMQ Broker network. Alternatively if a dedicated 
AMQ Broker is required the jms.brokerUrl configuration
+ * setting should be adjusted to point to that broker.
+ *
+ * This component works OOTB and in Unit tests with no further action.
+ *
+ * The jms.brokerUrl allows configuration of the broker in any way that 
ActiveMQ allows, including xbean and broker.
+ *
+ *
+ * Available URI patterns.
+ *
+ * xbean:org/apache/sling/amq/activemq.xml will result in the Broker searching 
for org/apache/sling/amq/activemq.xml in
+ * the classpath and using that to configure the Borker, see 
http://activemq.apache.org/xml-configuration.html for details
+ * of the format. See that location for an example of the default 
configuration.
+ *
+ *
+ *
+ * broker:tcp://localhost:61616 will create a broker on localhost port 61616 
using the URI configuration format.
+ * See http://activemq.apache.org/broker-configuration-uri.html and 
http://activemq.apache.org/broker-uri.html for
+ * details of the format.
+ *
+ * properties:/foo/bar.properties uses a properties file as per 
http://activemq.apache.org/broker-properties-uri.html
+ *
+ */
+@Component(immediate = true, metatype = true)
+@Service(value=ConnectionFactoryService.class)
+public class ActiveMQConnectionFactoryService implements 
ConnectionFactoryService {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveMQConnectionFactoryService.class);
+    private PooledConnectionFactory pooledConnectionFactory;
+
+    // Where the broker is configured out of the box, the shutdown hook must 
be disabled.
+    // so that the deactivate method can perform the shutdown.
+    // This assumes that OSGi does shutdown properly.
+
+    public static final String DEFAULT_BROKER_URI = 
"vm://localhost:61616?broker.useShutdownHook=false";
+    @Property(value = DEFAULT_BROKER_URI)
+    public static final String BROKER_URI = "jms.brokerUri";
+
+
+
+
+    @Activate
+    public void activate(Map<String, Object> props) {
+
+        String brokerURL = (String) props.get(BROKER_URI);
+
+        pooledConnectionFactory = new PooledConnectionFactory(brokerURL);
+        pooledConnectionFactory.start();
+    }
+
+
+    @Deactivate
+    public void deactivate(Map<String, Object> props) {
+
+        LOGGER.info("Stopping ActiveMQ Pooled connection factory");
+        pooledConnectionFactory.stop();
+        pooledConnectionFactory = null;
+    }
+
+
+
+    @Override
+    public ConnectionFactory getConnectionFactory() {
+        return pooledConnectionFactory;
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java 
b/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java
new file mode 100644
index 0000000..e14a3f7
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/ConnectionFactoryService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import javax.jms.ConnectionFactory;
+
+/**
+ * Created by ieb on 30/03/2016.
+ * Implementations of this service provide JMS Connection factories. In 
general implementations should work OOTB with no
+ * further configuration and provide an efficient JMS Connection Factory that 
allows sessions to be created with minimal
+ * overhead.
+ */
+public interface ConnectionFactoryService {
+    /**
+     *
+     * Get a connection factory.
+     * @return the connection factory.
+     */
+    ConnectionFactory getConnectionFactory();
+}
diff --git a/src/main/java/org/apache/sling/jms/JMSMessageTypes.java 
b/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
new file mode 100644
index 0000000..a6fcf29
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/JMSMessageTypes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+/**
+ * Created by ieb on 04/04/2016.
+ */
+public enum JMSMessageTypes {
+    /**
+     * A text message containing json.
+     */
+    JSON
+}
diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java 
b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
new file mode 100644
index 0000000..59d888d
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/JMSQueueManager.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.mom.*;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.jms.*;
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by ieb on 30/03/2016.
+ * A JMS implementation of a QueueManager. It will allow callers to add 
messages to named queues, and consumers to read
+ * messages from named queues in order. The component uses a single connection 
to the JMS broker, but dedicated sessions
+ * for each send and for each Queue reader.
+ */
+@Component(immediate = true)
+@Service(value = QueueManager.class)
+public class JMSQueueManager implements QueueManager {
+
+
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JMSQueueManager.class);
+    private static final String NRETRIES = "_nr";
+
+    @Reference
+    private ConnectionFactoryService connectionFactoryService;
+
+
+    /**
+     * Holds all QueueReader registrations.
+     */
+    @Reference(referenceInterface = QueueReader.class,
+            cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+            policy = ReferencePolicy.DYNAMIC,
+            bind="addReader",
+            unbind="removeReader")
+    private final Map<ServiceReference<QueueReader>, QueueReaderHolder> 
registrations =
+            new ConcurrentHashMap<ServiceReference<QueueReader>, 
QueueReaderHolder>();
+
+    private Connection connection;
+
+    @Activate
+    public synchronized void activate(Map<String, Object> properties) throws 
JMSException {
+        connection = 
connectionFactoryService.getConnectionFactory().createConnection();
+        connection.start();
+    }
+
+    @Deactivate
+    public synchronized void deactivate(Map<String, Object> properties) throws 
JMSException {
+        for ( Map.Entry<ServiceReference<QueueReader>, QueueReaderHolder> e : 
registrations.entrySet()) {
+            e.getValue().close();
+        }
+        registrations.clear();
+        connection.stop();
+        connection.close();
+    }
+
+
+
+    /**
+     * Add a message to the queue. The message is added to the queue 
transactionally and auto acknowledged.
+     * @param name the name of the queue.
+     * @param message the message to post to the queue.
+     */
+    @Override
+    public void add(@Nonnull Types.QueueName name, @Nonnull Map<String, 
Object> message) {
+        Session session = null;
+        try {
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            message.put(NRETRIES, 0L); // set the number of retries to 0.
+            TextMessage textMessage = 
session.createTextMessage(Json.toJson(message));
+            textMessage.setJMSType(JMSMessageTypes.JSON.toString());
+            LOGGER.info("Sending to {} message {} ", name, textMessage);
+            
session.createProducer(session.createQueue(name.toString())).send(textMessage);
+            session.commit();
+            session.close();
+        } catch (JMSException e) {
+            LOGGER.error("Unable to send message to queue "+name, e);
+            close(session);
+
+        }
+
+    }
+
+
+    /**
+     * quietly close the session.
+     * @param session
+     */
+    private void close(Session session) {
+        if(session != null) {
+            try {
+                session.close();
+            } catch (JMSException e) {
+                LOGGER.warn("Unable to close session ",e);
+            }
+        }
+    }
+
+
+    // Register Readers using OSGi Whiteboard pattern
+    public synchronized  void addReader(ServiceReference<QueueReader> 
serviceRef) {
+        if (registrations.containsKey(serviceRef)) {
+            LOGGER.error("Registration for service reference is already 
present {}",serviceRef);
+            return;
+        }
+        QueueReaderHolder queueReaderHolder = new 
QueueReaderHolder(connection, 
serviceRef.getBundle().getBundleContext().getService(serviceRef), 
getServiceProperties(serviceRef));
+        registrations.put(serviceRef, queueReaderHolder);
+    }
+
+    private Map<String, Object> 
getServiceProperties(ServiceReference<QueueReader> serviceRef) {
+        Map<String, Object> m = new HashMap<String, Object>();
+        for ( String k : serviceRef.getPropertyKeys()) {
+            m.put(k, serviceRef.getProperty(k));
+        }
+        return Collections.unmodifiableMap(m);
+    }
+
+    public synchronized void removeReader(ServiceReference<QueueReader> 
serviceRef) {
+        QueueReaderHolder queueReaderHolder = registrations.remove(serviceRef);
+        if ( queueReaderHolder != null) {
+            queueReaderHolder.close();
+        }
+    }
+
+    private static class QueueReaderHolder implements Closeable {
+        private final JMSQueueSession session;
+
+        public QueueReaderHolder(Connection connection, QueueReader 
queueReader, Map<String, Object> properties) {
+            try {
+                LOGGER.info("Creating Queue holder for {} ", 
queueReader.getClass());
+                String name = (String) 
properties.get(QueueReader.QUEUE_NAME_PROP);
+                checkNotNull(name, "A valid queue name as property " + 
QueueReader.QUEUE_NAME_PROP + " is required for QueueReader registration");
+                if (queueReader instanceof MessageFilter) {
+                    session = new JMSQueueSession(connection, queueReader, 
name, (MessageFilter) queueReader, true, 5);
+                } else {
+                    session = new JMSQueueSession(connection, queueReader, 
name, new MessageFilter() {
+                        @Override
+                        public boolean accept(Types.Name name, Map<String, 
Object> mapMessage) {
+                            return true;
+                        }
+
+                    }, true, 5);
+
+                }
+            } catch (JMSException e) {
+                throw new IllegalArgumentException("Unable to register 
QueueReader with JMS ",e);
+            }
+
+        }
+
+        public void close() {
+            session.close();
+        }
+    }
+
+    private static void checkNotNull(Object v, String message) {
+        if ( v == null) {
+            throw new IllegalArgumentException(message);
+        }
+    }
+
+
+    public static class JMSQueueSession implements Closeable, MessageListener {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(JMSQueueSession.class);
+        private final QueueReader queueReader;
+        private final String name;
+        private final MessageFilter messageFilter;
+        private final Session session;
+        private final MessageConsumer queueConsumer;
+        private final MessageProducer queueProducer;
+        private boolean retryByRequeue;
+        private int maxRetries;
+
+        public JMSQueueSession(Connection connection, QueueReader queueReader, 
String name,  MessageFilter messageFilter, boolean retryByRequeue, int 
maxRetries) throws JMSException {
+            this.queueReader = queueReader;
+            this.name = name;
+            this.messageFilter = messageFilter;
+            this.retryByRequeue = retryByRequeue;
+            this.maxRetries = maxRetries;
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(name);
+            queueConsumer = session.createConsumer(queue);
+            queueProducer = session.createProducer(queue);
+            queueConsumer.setMessageListener(this);
+        }
+
+        @Override
+        public void close() {
+            if ( queueConsumer != null) {
+                try {
+                    queueConsumer.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("Failed to close queue consumer on "+name,e);
+                }
+            }
+            if ( session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("Failed to close queue session on " + name, e);
+                }
+            }
+
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            boolean committed = false;
+            TextMessage textMessage = null;
+            try {
+                try {
+                    LOGGER.info("Got from {} message {} ", name, message);
+                    Destination destination = message.getJMSDestination();
+                    if (destination instanceof Queue) {
+                        Queue queue = (Queue) destination;
+                        if ( 
JMSMessageTypes.JSON.equals(JMSMessageTypes.valueOf(message.getJMSType()))) {
+                            textMessage = (TextMessage) message;
+                            final Map<String, Object> mapMessage = 
Json.toMap(textMessage.getText());
+                            Types.QueueName queueName = 
Types.queueName(queue.getQueueName());
+                            if (queueName.equals(name) && 
messageFilter.accept(queueName, mapMessage)) {
+                                queueReader.onMessage(queueName, mapMessage);
+                                session.commit();
+                                // all ok.
+                                committed = true;
+                                return;
+                            }
+                        }
+                    }
+                } catch (RequeueMessageException e) {
+                    LOGGER.info("QueueReader requested requeue of message ", 
e);
+                    if (retryByRequeue && textMessage != null) {
+                        Map<String, Object> mapMessage = 
Json.toMap(textMessage.getText());
+                        if ((int)mapMessage.get(NRETRIES) < maxRetries) {
+                            mapMessage.put(NRETRIES, ((long) 
mapMessage.get(NRETRIES)) + 1);
+                            TextMessage retryMessage = 
session.createTextMessage(Json.toJson(mapMessage));
+                            
retryMessage.setJMSType(JMSMessageTypes.JSON.toString());
+                            LOGGER.info("Retrying message Sending to {} 
message {} ", name, textMessage);
+                            queueProducer.send(retryMessage);
+                            session.commit();
+                            committed = true;
+                            return;
+                        }
+                    }
+                }
+            } catch (JMSException e) {
+                LOGGER.info("Receive failed leaving to provider to require if 
required. ", e);
+            } finally {
+                try {
+                    if (!committed) {
+                        session.rollback();
+                    }
+                } catch (JMSException e) {
+                    LOGGER.info("QueueReader rollback failed. ",e);
+                }
+            }
+            // If onMessage throws an exception JMS will put the message back 
onto the queue.
+            // the delay before it gets retried is a JMS server configuration.
+            throw new IllegalArgumentException("Unable to process message, 
requeue");
+        }
+
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/jms/JMSTopicManager.java 
b/src/main/java/org/apache/sling/jms/JMSTopicManager.java
new file mode 100644
index 0000000..e3b0588
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/JMSTopicManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.mom.*;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by ieb on 30/03/2016.
+ * This class provides support for sending messages to topics over JMS and 
subscribing to topics. It uses the ConnectionFactoryService
+ * to interact with JMS. There is nothing in
+ */
+@Component(immediate = true)
+@Service(value = TopicManager.class)
+public class JMSTopicManager implements TopicManager {
+
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JMSTopicManager.class);
+
+
+    /**
+     * Holds all QueueReader registrations.
+     */
+    @Reference(referenceInterface = Subscriber.class,
+            cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+            policy = ReferencePolicy.DYNAMIC,
+            bind="addSubscriber",
+            unbind="removeSubscriber")
+    private final Map<ServiceReference<Subscriber>, SubscriberHolder> 
registrations =
+            new ConcurrentHashMap<ServiceReference<Subscriber>, 
SubscriberHolder>();
+
+    @Reference
+    private ConnectionFactoryService connectionFactoryService;
+    // A single connection is maintained open per instance of this component.
+    private Connection connection;
+    // A single session is used for listening to messages. Separate sessions 
are opened for sending to avoid synchronisation on sending operations.
+    private Session session;
+    private final Object lock = new Object();
+
+    @Activate
+    public synchronized  void activate(Map<String, Object> properties) throws 
JMSException {
+            connection = 
connectionFactoryService.getConnectionFactory().createConnection();
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            connection.start();
+    }
+
+    @Deactivate
+    public synchronized  void deactivate(Map<String, Object> properties) 
throws JMSException {
+        for ( Map.Entry<ServiceReference<Subscriber>, SubscriberHolder> e : 
registrations.entrySet()) {
+            e.getValue().close();
+        }
+        registrations.clear();
+        // don't close the session, there is a bug in JMS which means an 
already closed session wont go quietly
+        // and the hook that shutsdown an embedded connection still gets fired 
when OSGi shutsdown even with
+        // a flag to prevent it. connection.stop and close are clean.
+        connection.stop();
+        connection.close();
+    }
+
+
+    @Override
+    public void publish(Types.TopicName name, Types.CommandName commandName, 
Map<String, Object> message) {
+        Session session = null;
+        try {
+            // use a fresh session per message.
+            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            TextMessage textMessage = 
session.createTextMessage(Json.toJson(message));
+            textMessage.setJMSType(JMSMessageTypes.JSON.toString());
+            
session.createProducer(session.createTopic(name.toString())).send(textMessage);
+            session.commit();
+            session.close();
+        } catch (JMSException e) {
+            LOGGER.error("Unable to send message to queue "+name, e);
+            if(session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e1) {
+                    LOGGER.warn("Unable to close session ",e1);
+                }
+            }
+
+        }
+    }
+
+
+    // Register Subscribers using OSGi Whiteboard pattern
+    public synchronized  void addSubscriber(ServiceReference<Subscriber> 
serviceRef) {
+        if (registrations.containsKey(serviceRef)) {
+            LOGGER.error("Registration for service reference is already 
present {}",serviceRef);
+            return;
+        }
+        SubscriberHolder subscriberHolder = new SubscriberHolder(session, 
serviceRef.getBundle().getBundleContext().getService(serviceRef), 
getServiceProperties(serviceRef));
+        registrations.put(serviceRef, subscriberHolder);
+    }
+
+    private Map<String, Object> 
getServiceProperties(ServiceReference<Subscriber> serviceRef) {
+        Map<String, Object> m = new HashMap<String, Object>();
+        for ( String k : serviceRef.getPropertyKeys()) {
+            m.put(k, serviceRef.getProperty(k));
+        }
+        return Collections.unmodifiableMap(m);
+    }
+
+    public synchronized void removeSubscriber(ServiceReference<Subscriber> 
serviceRef) {
+        SubscriberHolder subscriberHolder = registrations.remove(serviceRef);
+        if (subscriberHolder != null) {
+            subscriberHolder.close();
+        }
+    }
+
+    private static class SubscriberHolder implements Closeable {
+
+
+        private final FilteredTopicSubscriber filteredTopicSubscriber;
+
+        public SubscriberHolder(Session session, Subscriber subscriber, 
Map<String, Object> properties) {
+            try {
+                LOGGER.info("Creating Subscriber holder for {} ", 
subscriber.getClass());
+                String[] topicNames = (String[]) 
properties.get(Subscriber.TOPIC_NAMES_PROP);
+
+                if ( topicNames == null || topicNames.length == 0) {
+                    throw new IllegalArgumentException("At least one valid 
topic name in property " + Subscriber.TOPIC_NAMES_PROP + " is required for 
Subscriber registration");
+                }
+                if ( subscriber instanceof MessageFilter) {
+                    filteredTopicSubscriber = new 
FilteredTopicSubscriber(session, subscriber, topicNames, 
(MessageFilter)subscriber);
+                } else {
+                    filteredTopicSubscriber = new 
FilteredTopicSubscriber(session, subscriber, topicNames, new MessageFilter() {
+
+
+                        @Override
+                        public boolean accept(Types.Name name, Map<String, 
Object> mapMessage) {
+                            return true;
+                        }
+                    });
+
+                }
+            } catch (JMSException e) {
+                throw new IllegalArgumentException("Unable to register 
QueueReader with JMS ",e);
+            }
+
+        }
+
+        public void close() {
+            try {
+                filteredTopicSubscriber.close();
+            } catch (IOException e) {
+                LOGGER.warn("Unable to close topic subscriber {} ", e);
+            }
+        }
+    }
+
+    /**
+     * This listens to topic messages, and applies the message filter prior to 
sending to the subscriber.
+     * Although JMS has its own filtering language, this is JMS specific and 
since we don't want to expose implementation
+     * details in the JOBs API either explicitly or out of band, the JMS 
specific filters cant be used. As a replacement the
+     * API provides the MessageFilter API.
+     */
+    private static final class FilteredTopicSubscriber implements Closeable, 
MessageListener {
+        private final Subscriber subscriber;
+        private final MessageFilter filter;
+        private final List<MessageConsumer> consumers = new 
ArrayList<MessageConsumer>();
+
+        public FilteredTopicSubscriber(@Nonnull Session session,
+                                       @Nonnull Subscriber subscriber,
+                                       @Nonnull String[] topicNames,
+                                       @Nonnull MessageFilter filter) throws 
JMSException {
+            this.subscriber = subscriber;
+            this.filter = filter;
+            for (String t : topicNames) {
+                MessageConsumer c = 
session.createConsumer(session.createTopic(t));
+                c.setMessageListener(this);
+                consumers.add(c);
+            }
+        }
+
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                LOGGER.info("Got message {} ", message);
+                Destination destination = message.getJMSDestination();
+                if (destination instanceof Topic) {
+                    Topic topic = (Topic) destination;
+                    String type = message.getJMSType();
+                    if 
(JMSMessageTypes.JSON.equals(JMSMessageTypes.valueOf(type))) {
+                        TextMessage textMessage = (TextMessage) message;
+                        Map<String, Object> mapMessage = 
Json.toMap(textMessage.getText());
+                        Types.TopicName topicName = 
Types.topicName(topic.getTopicName());
+                        if ( filter.accept(topicName, mapMessage) ) {
+                            subscriber.onMessage(topicName, mapMessage);
+                        }
+                    }
+                }
+            } catch (JMSException e) {
+                LOGGER.warn("Failed to deliver message ",e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            for (MessageConsumer c : consumers) {
+                try {
+                    LOGGER.info("Closing consumer on dispose {} ",c);
+                    c.close();
+                } catch (JMSException e) {
+                    LOGGER.warn(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+
+
+}
diff --git a/src/main/java/org/apache/sling/jms/Json.java 
b/src/main/java/org/apache/sling/jms/Json.java
new file mode 100644
index 0000000..b4715d2
--- /dev/null
+++ b/src/main/java/org/apache/sling/jms/Json.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by ieb on 31/03/2016.
+ */
+public class Json {
+
+    public static Map<String, Object> toMap(String text) {
+        JsonElement root = new JsonParser().parse(text);
+        return toMapValue(root);
+    }
+
+    private static <T> T toMapValue(JsonElement element) {
+        if (element.isJsonObject()) {
+            return (T) toMapValue(element.getAsJsonObject());
+        } else if (element.isJsonArray()) {
+            return (T) toMapValue(element.getAsJsonArray());
+        } else if (element.isJsonNull()) {
+            return null;
+        } else if (element.isJsonPrimitive()) {
+            return (T) toMapValue(element.getAsJsonPrimitive());
+        }
+        throw new IllegalArgumentException("Encountered JsonElement that is 
not an object, array, primitive or null: "+element);
+    }
+    private static <T> T toMapValue(JsonArray array) {
+        List<Object> list = new ArrayList<Object>();
+        for( JsonElement e : array) {
+            list.add(toMapValue(e));
+        }
+        return (T) list;
+    }
+
+    private static <T> T toMapValue(JsonObject obj) {
+        Map<String, Object> m = new HashMap<String, Object>();
+        for (Map.Entry<String, JsonElement> e : obj.entrySet()) {
+            m.put(e.getKey(), toMapValue(e.getValue()));
+        }
+        return (T) m;
+    }
+
+    private static <T> T toMapValue(JsonPrimitive p) {
+        if (p.isString()) {
+            return (T) p.getAsString();
+        } else if (p.isBoolean()) {
+            return (T) ((Boolean)p.getAsBoolean());
+        } else if (p.isNumber()) {
+            double d = p.getAsDouble();
+            if (Math.floor(d) == d) {
+                return (T)((Long)p.getAsLong());
+            }
+            return (T)((Double)d);
+        } else {
+            return null;
+        }
+    }
+
+    public static String toJson(Map<String, Object> message) {
+        return new Gson().toJson(message);
+    }
+
+
+}
diff --git a/src/main/resources/org/apache/sling/amq/activemq.xml 
b/src/main/resources/org/apache/sling/amq/activemq.xml
new file mode 100644
index 0000000..d6313d5
--- /dev/null
+++ b/src/main/resources/org/apache/sling/amq/activemq.xml
@@ -0,0 +1,170 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!--  START SNIPPET: example  -->
+<beans xmlns="http://www.springframework.org/schema/beans"; 
xmlns:amq="http://activemq.apache.org/schema/core"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd 
http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>
+    <!--
+     Allows us to use system properties as variables in this configuration file
+    -->
+    <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+        <property name="locations">
+            <value>file:${activemq.conf}/credentials.properties</value>
+        </property>
+    </bean>
+    <!--
+
+            The <broker> element is used to configure the ActiveMQ broker.
+
+    -->
+    <broker xmlns="http://activemq.apache.org/schema/core"; 
brokerName="localhost" dataDirectory="${activemq.data}">
+        <!--
+
+                    For better performances use VM cursor and small memory 
limit.
+                    For more information, see:
+
+                    http://activemq.apache.org/message-cursors.html
+
+                    Also, if your producer is "hanging", it's probably due to 
producer flow control.
+                    For more information, see:
+                    http://activemq.apache.org/producer-flow-control.html
+
+        -->
+        <destinationPolicy>
+            <policyMap>
+                <policyEntries>
+                    <policyEntry topic=">" producerFlowControl="true">
+                        <!--
+                         The constantPendingMessageLimitStrategy is used to 
prevent
+                                                 slow topic consumers to block 
producers and affect other consumers
+                                                 by limiting the number of 
messages that are retained
+                                                 For more information, see:
+
+                                                 
http://activemq.apache.org/slow-consumer-handling.html
+
+
+                        -->
+                        <pendingMessageLimitStrategy>
+                            <constantPendingMessageLimitStrategy limit="1000"/>
+                        </pendingMessageLimitStrategy>
+                    </policyEntry>
+                    <policyEntry queue=">" producerFlowControl="true" 
memoryLimit="1mb">
+                        <!--
+                         Use VM cursor for better latency
+                                               For more information, see:
+
+                                               
http://activemq.apache.org/message-cursors.html
+
+                                          <pendingQueuePolicy>
+                                            <vmQueueCursor/>
+                                          </pendingQueuePolicy>
+
+                        -->
+                    </policyEntry>
+                </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+        <!--
+
+                    The managementContext is used to configure how ActiveMQ is 
exposed in
+                    JMX. By default, ActiveMQ uses the MBean server that is 
started by
+                    the JVM. For more information, see:
+
+                    http://activemq.apache.org/jmx.html
+
+        -->
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+        <!--
+
+                    Configure message persistence for the broker. The default 
persistence
+                    mechanism is the KahaDB store (identified by the kahaDB 
tag).
+                    For more information, see:
+
+                    http://activemq.apache.org/persistence.html
+
+        -->
+        <persistenceAdapter>
+            <kahaDB directory="${activemq.data}/kahadb"/>
+        </persistenceAdapter>
+        <!--
+
+                    The systemUsage controls the maximum amount of space the 
broker will
+                    use before slowing down producers. For more information, 
see:
+                    http://activemq.apache.org/producer-flow-control.html
+                    If using ActiveMQ embedded - the following limits could 
safely be used:
+
+                <systemUsage>
+                    <systemUsage>
+                        <memoryUsage>
+                            <memoryUsage limit="20 mb"/>
+                        </memoryUsage>
+                        <storeUsage>
+                            <storeUsage limit="1 gb"/>
+                        </storeUsage>
+                        <tempUsage>
+                            <tempUsage limit="100 mb"/>
+                        </tempUsage>
+                    </systemUsage>
+                </systemUsage>
+
+        -->
+        <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage limit="64 mb"/>
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="100 gb"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="50 gb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+        <!--
+
+                    The transport connectors expose ActiveMQ over a given 
protocol to
+                    clients and other brokers. For more information, see:
+
+                    http://activemq.apache.org/configuring-transports.html
+
+        -->
+        <transportConnectors>
+            <!--
+             DOS protection, limit concurrent connections to 1000 and frame 
size to 100MB
+            -->
+            <transportConnector name="openwire" 
uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
+            <transportConnector name="amqp" 
uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
+        </transportConnectors>
+        <!--
+         destroy the spring context on shutdown to stop jetty
+        -->
+        <shutdownHooks>
+            <bean xmlns="http://www.springframework.org/schema/beans"; 
class="org.apache.activemq.hooks.SpringContextHook"/>
+        </shutdownHooks>
+    </broker>
+    <!--
+
+            Enable web consoles, REST and Ajax APIs and demos
+
+            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
+
+    -->
+    <import resource="jetty.xml"/>
+</beans>
+        <!--  END SNIPPET: example  -->
\ No newline at end of file
diff --git a/src/main/resources/org/apache/sling/amq/credentials.properties 
b/src/main/resources/org/apache/sling/amq/credentials.properties
new file mode 100644
index 0000000..b8be66e
--- /dev/null
+++ b/src/main/resources/org/apache/sling/amq/credentials.properties
@@ -0,0 +1,22 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# Defines credentials that will be used by components (like web console) to 
access the broker
+
+activemq.username=system
+activemq.password=manager
+guest.password=password
\ No newline at end of file
diff --git a/src/main/resources/org/apache/sling/amq/jetty.xml 
b/src/main/resources/org/apache/sling/amq/jetty.xml
new file mode 100644
index 0000000..bc03f77
--- /dev/null
+++ b/src/main/resources/org/apache/sling/amq/jetty.xml
@@ -0,0 +1,144 @@
+<!--
+
+        Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+        license agreements. See the NOTICE file distributed with this work for 
additional
+        information regarding copyright ownership. The ASF licenses this file 
to You under
+        the Apache License, Version 2.0 (the "License"); you may not use this 
file except in
+        compliance with the License. You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0 Unless required by 
applicable law or
+        agreed to in writing, software distributed under the License is 
distributed on an
+        "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or
+        implied. See the License for the specific language governing 
permissions and
+        limitations under the License.
+
+-->
+<!--
+
+        An embedded servlet engine for serving up the Admin consoles, REST and 
Ajax APIs and
+        some demos Include this file in your configuration to enable ActiveMQ 
web components
+        e.g. <import resource="jetty.xml"/>
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd";>
+    <bean id="securityLoginService" 
class="org.eclipse.jetty.security.HashLoginService">
+        <property name="name" value="ActiveMQRealm"/>
+        <property name="config" 
value="${activemq.conf}/jetty-realm.properties"/>
+    </bean>
+    <bean id="securityConstraint" 
class="org.eclipse.jetty.util.security.Constraint">
+        <property name="name" value="BASIC"/>
+        <property name="roles" value="user,admin"/>
+        <property name="authenticate" value="true"/>
+    </bean>
+    <bean id="adminSecurityConstraint" 
class="org.eclipse.jetty.util.security.Constraint">
+        <property name="name" value="BASIC"/>
+        <property name="roles" value="admin"/>
+        <property name="authenticate" value="true"/>
+    </bean>
+    <bean id="securityConstraintMapping" 
class="org.eclipse.jetty.security.ConstraintMapping">
+        <property name="constraint" ref="securityConstraint"/>
+        <property name="pathSpec" value="/,*.jsp"/>
+    </bean>
+    <bean id="adminSecurityConstraintMapping" 
class="org.eclipse.jetty.security.ConstraintMapping">
+        <property name="constraint" ref="adminSecurityConstraint"/>
+        <property name="pathSpec" value="*.action"/>
+    </bean>
+    <bean id="securityHandler" 
class="org.eclipse.jetty.security.ConstraintSecurityHandler">
+        <property name="loginService" ref="securityLoginService"/>
+        <property name="authenticator">
+            <bean 
class="org.eclipse.jetty.security.authentication.BasicAuthenticator"/>
+        </property>
+        <property name="constraintMappings">
+            <list>
+                <ref bean="adminSecurityConstraintMapping"/>
+                <ref bean="securityConstraintMapping"/>
+            </list>
+        </property>
+        <property name="handler">
+            <bean id="sec" 
class="org.eclipse.jetty.server.handler.HandlerCollection">
+                <property name="handlers">
+                    <list>
+                        <bean class="org.eclipse.jetty.webapp.WebAppContext">
+                            <property name="contextPath" value="/hawtio"/>
+                            <property name="war" 
value="${activemq.home}/webapps/hawtio"/>
+                            <property name="logUrlOnStart" value="true"/>
+                        </bean>
+                        <bean class="org.eclipse.jetty.webapp.WebAppContext">
+                            <property name="contextPath" value="/admin"/>
+                            <property name="resourceBase" 
value="${activemq.home}/webapps/admin"/>
+                            <property name="logUrlOnStart" value="true"/>
+                        </bean>
+                        <bean class="org.eclipse.jetty.webapp.WebAppContext">
+                            <property name="contextPath" value="/fileserver"/>
+                            <property name="resourceBase" 
value="${activemq.home}/webapps/fileserver"/>
+                            <property name="logUrlOnStart" value="true"/>
+                            <property name="parentLoaderPriority" 
value="true"/>
+                        </bean>
+                        <bean class="org.eclipse.jetty.webapp.WebAppContext">
+                            <property name="contextPath" value="/api"/>
+                            <property name="resourceBase" 
value="${activemq.home}/webapps/api"/>
+                            <property name="logUrlOnStart" value="true"/>
+                        </bean>
+                        <bean 
class="org.eclipse.jetty.server.handler.ResourceHandler">
+                            <property name="directoriesListed" value="false"/>
+                            <property name="welcomeFiles">
+                                <list>
+                                    <value>index.html</value>
+                                </list>
+                            </property>
+                            <property name="resourceBase" 
value="${activemq.home}/webapps/"/>
+                        </bean>
+                        <bean id="defaultHandler" 
class="org.eclipse.jetty.server.handler.DefaultHandler">
+                            <property name="serveIcon" value="false"/>
+                        </bean>
+                    </list>
+                </property>
+            </bean>
+        </property>
+    </bean>
+    <bean id="rewrite" 
class="org.eclipse.jetty.rewrite.handler.RewriteHandler">
+        <property name="rules">
+            <set>
+                <bean 
class="org.eclipse.jetty.rewrite.handler.RedirectRegexRule">
+                    <property name="regex" value="/api/jolokia(.*)"/>
+                    <property name="replacement" value="/hawtio/jolokia$1"/>
+                </bean>
+            </set>
+        </property>
+    </bean>
+    <bean id="contexts" 
class="org.eclipse.jetty.server.handler.ContextHandlerCollection"></bean>
+    <bean id="Server" class="org.eclipse.jetty.server.Server" 
init-method="start" destroy-method="stop">
+        <property name="connectors">
+            <list>
+                <bean id="Connector" 
class="org.eclipse.jetty.server.nio.SelectChannelConnector">
+                    <property name="port" value="8161"/>
+                </bean>
+                <!--
+
+                                    Enable this connector if you wish to use 
https with web console
+
+                -->
+                <!--
+
+                                <bean id="SecureConnector" 
class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
+                                    <property name="port" value="8162" />
+                                    <property name="keystore" 
value="file:${activemq.conf}/broker.ks" />
+                                    <property name="password" value="password" 
/>
+                                </bean>
+
+                -->
+            </list>
+        </property>
+        <property name="handler">
+            <bean id="handlers" 
class="org.eclipse.jetty.server.handler.HandlerCollection">
+                <property name="handlers">
+                    <list>
+                        <ref bean="rewrite"/>
+                        <ref bean="contexts"/>
+                        <ref bean="securityHandler"/>
+                    </list>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java 
b/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java
new file mode 100644
index 0000000..0a1da45
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.amq;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.jms.*;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by ieb on 31/03/2016.
+ */
+public class ActiveMQConnectionFactoryServiceTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveMQConnectionFactoryServiceTest.class);
+
+    @Test
+    public void testGetConnectionFactory() throws Exception {
+        LOGGER.info("Starting test");
+        ActiveMQConnectionFactoryService cfs = 
ActiveMQConnectionFactoryServiceTest.activate(null);
+        ConnectionFactory cf = cfs.getConnectionFactory();
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+        Topic t = session.createTopic("testTopic");
+        MessageConsumer consumer = session.createConsumer(t);
+        LOGGER.info("Starting connection");
+        connection.start();
+        LOGGER.info("Connection started.. sending message");
+        session.createProducer(t).send(session.createTextMessage("testing with 
a message"));
+        session.commit();
+        LOGGER.info("Message sent ... receiving message");
+        Message m = consumer.receive();
+        LOGGER.info("Message received");
+        assertTrue(m instanceof TextMessage);
+        assertEquals("testing with a message", ((TextMessage)m).getText());
+        session.close();
+        connection.stop();
+
+        deactivate(cfs);
+    }
+
+    public static void deactivate(@Nonnull ActiveMQConnectionFactoryService 
cfs) {
+        cfs.deactivate(new HashMap<String, Object>());
+    }
+
+    @Nonnull
+    public static ActiveMQConnectionFactoryService activate(@Nullable 
Map<String, Object> props) {
+        ActiveMQConnectionFactoryService amqConnectionFactoryService = new 
ActiveMQConnectionFactoryService();
+        if ( props == null ) {
+            props = new HashMap<String, Object>();
+            props.put(ActiveMQConnectionFactoryService.BROKER_URI, 
ActiveMQConnectionFactoryService.DEFAULT_BROKER_URI);
+        }
+        amqConnectionFactoryService.activate(props);
+        return amqConnectionFactoryService;
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java 
b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
new file mode 100644
index 0000000..ddf20a7
--- /dev/null
+++ b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jms;
+
+import org.apache.sling.amq.ActiveMQConnectionFactoryService;
+import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest;
+import org.apache.sling.mom.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.lang.reflect.Field;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by ieb on 01/04/2016.
+ */
+public class JMSQueueManagerTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JMSQueueManagerTest.class);
+    private ActiveMQConnectionFactoryService amqConnectionFactoryService;
+    private JMSQueueManager jmsQueueManager;
+    private Map<String, Object> testMap;
+    private boolean passed;
+    private int ndeliveries;
+
+    @Mock
+    private ServiceReference<QueueReader> serviceReference;
+    @Mock
+    private Bundle bundle;
+    @Mock
+    private BundleContext bundleContext;
+    private Map<String, Object> serviceProperties = new HashMap<String, 
Object>();
+
+    public JMSQueueManagerTest() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        Mockito.when(serviceReference.getBundle()).thenReturn(bundle);
+        Mockito.when(bundle.getBundleContext()).thenReturn(bundleContext);
+        Mockito.when(serviceReference.getPropertyKeys()).thenAnswer(new 
Answer<String[]>() {
+            @Override
+            public String[] answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                return (String[]) serviceProperties.keySet().toArray(new 
String[serviceProperties.size()]);
+            }
+        });
+        
Mockito.when(serviceReference.getProperty(Mockito.anyString())).thenAnswer(new 
Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                return 
serviceProperties.get(invocationOnMock.getArguments()[0]);
+            }
+        });
+        amqConnectionFactoryService = 
ActiveMQConnectionFactoryServiceTest.activate(null);
+        jmsQueueManager = 
JMSQueueManagerTest.activate(amqConnectionFactoryService);
+        testMap = JsonTest.createTestMap();
+        passed = false;
+
+    }
+
+
+    private static JMSQueueManager activate(ActiveMQConnectionFactoryService 
amqConnectionFactoryService) throws NoSuchFieldException, 
IllegalAccessException, JMSException {
+        JMSQueueManager jmsQueueManager = new JMSQueueManager();
+        setPrivate(jmsQueueManager, "connectionFactoryService", 
amqConnectionFactoryService);
+        jmsQueueManager.activate(new HashMap<String, Object>());
+        return jmsQueueManager;
+
+    }
+
+    private static void setPrivate(Object object, String name, Object value) 
throws NoSuchFieldException, IllegalAccessException {
+        Field field = object.getClass().getDeclaredField(name);
+        if ( !field.isAccessible()) {
+            field.setAccessible(true);
+        }
+        field.set(object, value);
+    }
+
+
+    @After
+    public void after() throws JMSException {
+        JMSQueueManagerTest.deactivate(jmsQueueManager);
+        
ActiveMQConnectionFactoryServiceTest.deactivate(amqConnectionFactoryService);
+    }
+
+    public static void deactivate(JMSQueueManager jmsQueueManager) throws 
JMSException {
+        jmsQueueManager.deactivate(new HashMap<String, Object>());
+    }
+
+    @Test
+    public void testQueue() throws JMSException, InterruptedException {
+        // clean the queue out of messages from earlier tests, which may have 
failed.
+        final String queueName = "testQueueReject";
+
+        emptyQueue(queueName);
+        // make the test map unique.
+        testMap.put("testing", queueName + System.currentTimeMillis());
+        jmsQueueManager.add(Types.queueName(queueName), testMap);
+
+        checkMessagesInQueue(queueName, 1);
+
+        ndeliveries = 0;
+        QueueReader queueReader = new QueueReader() {
+            @Override
+            public void onMessage(Types.QueueName queueName, Map<String, 
Object> message) throws RequeueMessageException {
+                ndeliveries++;
+                JsonTest.checkEquals(testMap, message);
+                passed = true;
+            }
+        };
+
+        serviceProperties.clear();
+        serviceProperties.put(QueueReader.QUEUE_NAME_PROP, queueName);
+
+        
Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(queueReader);
+        jmsQueueManager.addReader(serviceReference);
+
+
+
+
+        waitForPassed(1000);
+        checkMessagesInQueue(queueName, 0);
+        waitForErrors(1000);
+
+        jmsQueueManager.removeReader(serviceReference);
+        assertEquals(1, ndeliveries);
+
+
+    }
+
+
+    private void waitForErrors(long t) throws InterruptedException {
+        Thread.sleep(t);
+    }
+
+    private boolean waitForPassed(long t) {
+        long end = System.currentTimeMillis() + t;
+        while(System.currentTimeMillis() < end) {
+            if (passed) {
+                return true;
+            } else {
+                Thread.yield();
+            }
+        }
+        LOGGER.info("Message not received after " + t + " ms");
+        return false;
+    }
+
+    private void checkMessagesInQueue(String name, int expected) throws 
JMSException {
+        Connection connection = 
amqConnectionFactoryService.getConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name);
+        QueueBrowser browser = session.createBrowser(queue);
+        int n = 0;
+        for(Enumeration e = browser.getEnumeration(); e.hasMoreElements(); ) {
+            Message m = (Message) e.nextElement();
+            LOGGER.info("Message at {} is {} ", n,m);
+            n++;
+        }
+        browser.close();
+        session.close();
+        connection.stop();
+        assertEquals(expected, n);
+    }
+
+    @Test
+    public void testQueueReject() throws JMSException, InterruptedException {
+        // clean the queue out of messages from earlier tests, which may have 
failed.
+        final String queueName = "testQueueReject";
+        emptyQueue(queueName);
+        // make the test map unique, if the dequeue fails, then the message 
wont be the first.
+        testMap.put("testing", queueName + System.currentTimeMillis());
+        LOGGER.info("Sending message to queue");
+        jmsQueueManager.add(Types.queueName(queueName), testMap);
+        LOGGER.info("Sent message to queue ... receiving from queue");
+
+        checkMessagesInQueue(queueName, 1);
+
+        ndeliveries = 0;
+        QueueReader queueReader = new QueueReader() {
+            @Override
+            public void onMessage(Types.QueueName queueName, Map<String, 
Object> message) throws RequeueMessageException {
+                JsonTest.checkEquals(testMap, message);
+                ndeliveries++;
+                if ( ndeliveries == 1) {
+                    LOGGER.info("Requesting requeue of message");
+                    throw new RequeueMessageException("Requeing");
+                } else if ( ndeliveries == 2) {
+                    LOGGER.info("Got message, accepting with no retry.");
+                    passed = true;
+                } else if ( ndeliveries > 2) {
+                    fail("Multiple delivered");
+                }
+            }
+        };
+
+        serviceProperties.clear();
+        serviceProperties.put(QueueReader.QUEUE_NAME_PROP, queueName);
+
+        
Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(queueReader);
+        jmsQueueManager.addReader(serviceReference);
+
+
+
+        waitForPassed(30000);
+
+        jmsQueueManager.removeReader(serviceReference);
+        checkMessagesInQueue(queueName, 0);
+        assertEquals(2, ndeliveries);
+
+
+    }
+
+    private void dumpQueue(String name) throws JMSException {
+        Connection connection = 
amqConnectionFactoryService.getConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(name);
+        QueueBrowser browser = session.createBrowser(queue);
+        LOGGER.info("Starting to dump queue {} ", name);
+        int n = 0;
+        for ( Enumeration messages = browser.getEnumeration();  
messages.hasMoreElements(); ) {
+            Message m = (Message) messages.nextElement();
+            LOGGER.info("Message at {}  is {} ", n, m);
+            n++;
+        }
+        LOGGER.info("Done dump queue {} ", name);
+        browser.close();
+        session.close();
+        connection.stop();
+
+    }
+
+    private void emptyQueue(String name) throws JMSException {
+        dumpQueue(name);
+        Connection connection = 
amqConnectionFactoryService.getConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(name);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        for (;;) {
+            Message m = consumer.receive(100);
+            if ( m == null) {
+                LOGGER.info("No more messages in queue {} ", name);
+                break;
+            }
+            LOGGER.info("Got message  {}",m);
+            m.acknowledge();
+            session.commit();
+        }
+        boolean shouldFail = false;
+        QueueBrowser browser = session.createBrowser(queue);
+        for ( Enumeration messages = browser.getEnumeration(); 
messages.hasMoreElements(); ) {
+            Message m = (Message) messages.nextElement();
+            LOGGER.info("Queued message {} ", m);
+            shouldFail = true;
+        }
+        browser.close();
+        if ( shouldFail) {
+            fail("Queue was not emptied as expected");
+        }
+        consumer.close();
+        session.close();
+        connection.stop();
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java 
b/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
new file mode 100644
index 0000000..71004e7
--- /dev/null
+++ b/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jms;
+
+import org.apache.sling.amq.ActiveMQConnectionFactoryService;
+import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest;
+import org.apache.sling.mom.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by ieb on 31/03/2016.
+ */
+public class JMSTopicManagerTest {
+
+    private static final long MESSAGE_LATENCY = 1000;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JMSTopicManagerTest.class);
+    private JMSTopicManager jsmTopicManager;
+    private ActiveMQConnectionFactoryService amqConnectionFactoryService;
+    private Map<String, Object> testMap;
+    private boolean passed;
+    private long lastSent;
+    @Mock
+    private ServiceReference<Subscriber> serviceReference;
+    @Mock
+    private Bundle bundle;
+    @Mock
+    private BundleContext bundleContext;
+    private Map<String, Object> serviceProperties = new HashMap<String, 
Object>();
+
+    public  JMSTopicManagerTest() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Before
+    public void before() throws NoSuchFieldException, IllegalAccessException, 
JMSException {
+        Mockito.when(serviceReference.getBundle()).thenReturn(bundle);
+        Mockito.when(bundle.getBundleContext()).thenReturn(bundleContext);
+        Mockito.when(serviceReference.getPropertyKeys()).thenAnswer(new 
Answer<String[]>() {
+            @Override
+            public String[] answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                return (String[]) serviceProperties.keySet().toArray(new 
String[serviceProperties.size()]);
+            }
+        });
+        
Mockito.when(serviceReference.getProperty(Mockito.anyString())).thenAnswer(new 
Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                return 
serviceProperties.get(invocationOnMock.getArguments()[0]);
+            }
+        });
+        amqConnectionFactoryService = 
ActiveMQConnectionFactoryServiceTest.activate(null);
+        jsmTopicManager = 
JMSTopicManagerTest.activate(amqConnectionFactoryService);
+        testMap = JsonTest.createTestMap();
+        passed = false;
+    }
+
+    public static JMSTopicManager activate(ActiveMQConnectionFactoryService 
amqConnectionFactoryService) throws NoSuchFieldException, 
IllegalAccessException, JMSException {
+        JMSTopicManager jsmTopicManager = new JMSTopicManager();
+        setPrivate(jsmTopicManager, "connectionFactoryService", 
amqConnectionFactoryService);
+        jsmTopicManager.activate(new HashMap<String, Object>());
+        return jsmTopicManager;
+
+    }
+
+    private static void setPrivate(Object object, String name, Object value) 
throws NoSuchFieldException, IllegalAccessException {
+        Field field = object.getClass().getDeclaredField(name);
+        if ( !field.isAccessible()) {
+            field.setAccessible(true);
+        }
+        field.set(object, value);
+    }
+
+    @After
+    public void after() throws JMSException {
+        JMSTopicManagerTest.deactivate(jsmTopicManager);
+        
ActiveMQConnectionFactoryServiceTest.deactivate(amqConnectionFactoryService);
+    }
+
+    public static void deactivate(JMSTopicManager jsmTopicManager) throws 
JMSException {
+        jsmTopicManager.deactivate(new HashMap<String, Object>());
+    }
+
+
+    /**
+     * Test a working publish operation, read the message and check all ok. 
Will try and read the message for 1s. Normally messages
+     * arrive within 15ms.
+     * @throws Exception
+     */
+    @Test
+    public void testPublish() throws Exception {
+        // make the test map unique.
+        testMap.put("testing", "testPublish" + System.currentTimeMillis());
+
+        addSubscriber(new String[]{"testtopic"}, true);
+
+        jsmTopicManager.publish(Types.topicName("testtopic"), 
Types.commandName("testcommand"), testMap);
+        lastSent = System.currentTimeMillis();
+        assertTrue(waitForPassed(MESSAGE_LATENCY));
+
+        removeSubscriber();
+    }
+
+
+    private void addSubscriber(String[] topics, boolean match) {
+
+        Subscriber subscriber = new TestingSubscriber(this, match, topics);
+
+        serviceProperties.clear();
+        serviceProperties.put(Subscriber.TOPIC_NAMES_PROP, topics);
+
+        
Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(subscriber);
+        jsmTopicManager.addSubscriber(serviceReference);
+
+    }
+
+    private void removeSubscriber() {
+        jsmTopicManager.removeSubscriber(serviceReference);
+    }
+
+
+    /**
+     * Test that a message sent with the wrong topic doesn't arrive, filtered 
by the topic inside the jmsTopicManager.
+     * @throws Exception
+     */
+    @Test
+    public void testFilterdByTopic() throws Exception {
+        // make the test map unique.
+        testMap.put("testing", "testFilterdByTopic" + 
System.currentTimeMillis());
+        addSubscriber(new String[]{"testtopic"}, false);
+
+        lastSent = System.currentTimeMillis();
+        assertFalse(waitForPassed(MESSAGE_LATENCY)); // not expecting a 
message at all
+
+        removeSubscriber();
+    }
+
+    /**
+     * Check that a message sent to the correct topic is filtered by the 
MessageFilter.
+     * The test waits 1s for the message to arrive. If testPublish does not 
fail, message
+     * latency is < 1s.
+     * @throws Exception
+     */
+    @Test
+    public void testFilterdByFilter() throws Exception {
+        // make the test map unique.
+        testMap.put("testing", "testFilterdByFilter" + 
System.currentTimeMillis());
+        addSubscriber(new String[]{"testtopic"}, false);
+
+        jsmTopicManager.publish(Types.topicName("testtopic"), 
Types.commandName("testcommand"), testMap);
+        lastSent = System.currentTimeMillis();
+        assertFalse(waitForPassed(MESSAGE_LATENCY)); // not expecting a 
message at all
+
+        removeSubscriber();
+    }
+
+
+    private boolean waitForPassed(long t) {
+        long end = System.currentTimeMillis() + t;
+        while(System.currentTimeMillis() < end) {
+            if (passed) {
+                return true;
+            } else {
+                Thread.yield();
+            }
+        }
+        LOGGER.info("Message not recieved after "+t+" ms");
+        return false;
+    }
+
+
+    private static class TestingSubscriber implements Subscriber, 
MessageFilter {
+        private JMSTopicManagerTest test;
+        private final boolean accept;
+        private final Set<Types.Name> topicnames;
+
+        public TestingSubscriber(JMSTopicManagerTest test, boolean accept, 
String[] topicname) {
+            this.test = test;
+            this.accept = accept;
+            this.topicnames = new HashSet<Types.Name>();
+            for(String t : topicname) {
+                topicnames.add(Types.topicName(t));
+            }
+        }
+
+        @Override
+        public void onMessage(Types.TopicName topic, Map<String, Object> 
message) {
+            LOGGER.info("Got message in 
"+(System.currentTimeMillis()-test.lastSent)+" ms");
+            JsonTest.checkEquals(test.testMap, message);
+            test.passed = true;
+        }
+
+        @Override
+        public boolean accept(Types.Name name, Map<String, Object> mapMessage) 
{
+            return topicnames.contains(name) == accept;
+        }
+
+
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/jms/JsonTest.java 
b/src/test/java/org/apache/sling/jms/JsonTest.java
new file mode 100644
index 0000000..6880c5b
--- /dev/null
+++ b/src/test/java/org/apache/sling/jms/JsonTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jms;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+import  org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by ieb on 31/03/2016.
+ */
+public class JsonTest {
+
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonTest.class);
+    private Map<String, Object> testMap;
+
+    @Before
+    public void setup() {
+        testMap = JsonTest.createTestMap();
+    }
+
+    public static Map<String,Object> createTestMap() {
+        Map<String, Object> testMap = new HashMap<String, Object>();
+        Map<String, Object> innerTestMap = new HashMap<String, Object>();
+        Map<String, Object> inner2TestMap = new HashMap<String, Object>();
+        Map<String, Object> listMap = new HashMap<String, Object>();
+
+        listMap.put("listMaplong",100L);
+        listMap.put("listMapboolean",true);
+        listMap.put("listMapstring","A String");
+        listMap.put("listMapdouble",1.001D);
+
+        testMap.put("long",100L);
+        testMap.put("boolean",true);
+        testMap.put("string","A String");
+        testMap.put("double",1.001D);
+        testMap.put("map",innerTestMap);
+        innerTestMap.put("innerlong",100L);
+        innerTestMap.put("innerboolean",true);
+        innerTestMap.put("innerstring","A String");
+        innerTestMap.put("innerdouble",1.001D);
+        innerTestMap.put("innermap",inner2TestMap);
+        inner2TestMap.put("inner2long",100L);
+        inner2TestMap.put("inner3boolean",true);
+        inner2TestMap.put("inner3string","A String");
+        inner2TestMap.put("inner3double",1.001D);
+        inner2TestMap.put("inner3list", Arrays.asList("string1","string2", 
"string2"));
+        inner2TestMap.put("inner3listofMaps", Arrays.asList(listMap, listMap, 
listMap));
+        return testMap;
+    }
+
+    @Test
+    public void testJson() throws Exception {
+        checkEquals(testMap, Json.toMap(Json.toJson(testMap)));
+
+    }
+
+    public static void checkEquals(Map<String, Object> expected, Map<String, 
Object> actual) {
+        LOGGER.info("Expected {}", expected);
+        LOGGER.info("Actual   {}", actual);
+        for(Map.Entry<String, Object> e : expected.entrySet()) {
+            if ( e.getValue() instanceof Map ) {
+                checkEquals((Map<String, Object>) e.getValue(), (Map<String, 
Object>) actual.get(e.getKey()));
+            } else if ( e.getValue() instanceof List ) {
+                checkEquals((List<Object>) e.getValue(), (List<Object>) 
actual.get(e.getKey()));
+            } else {
+                if ( e.getValue() == null && actual.get(e.getKey()) != null ) {
+                    LOGGER.info("Expected value for {}  is null but actual is 
{} ",  e.getKey(), actual.get(e.getKey()));
+                }
+                if ( e.getValue() != null && 
!e.getValue().equals(actual.get(e.getKey()))) {
+                    LOGGER.info("Expected value for {}  is {} but actual is 
{}",  new Object[]{e.getKey(), e.getValue(), actual.get(e.getKey())});
+                    LOGGER.info("Expected value for {}  is {} but actual is 
{}",  new Object[]{e.getKey(), e.getValue().getClass(), 
actual.get(e.getKey()).getClass()});
+                }
+                Assert.assertEquals(e.getValue(), actual.get(e.getKey()));
+            }
+        }
+        LOGGER.info("Maps equal ok");
+    }
+
+    private static void checkEquals(List<Object> expected, List<Object> 
actual) {
+        Assert.assertEquals(expected.size(), actual.size());
+        for (int i = 0; i < expected.size(); i++) {
+            Object e = expected.get(i);
+            Object a = actual.get(i);
+            if ( e instanceof Map ) {
+                checkEquals((Map<String, Object>) e, (Map<String, Object>) a);
+            } else if ( e instanceof List ) {
+                checkEquals((List<Object>) e, (List<Object>) a);
+            } else {
+                Assert.assertEquals(e,a);
+            }
+        }
+    }
+
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <commits@sling.apache.org>.

Reply via email to