Repository: activemq Updated Branches: refs/heads/master 540a66baa -> 9becfc0be
AMQ-5782 Added support to the Resource Adapter for SSL Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9becfc0b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9becfc0b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9becfc0b Branch: refs/heads/master Commit: 9becfc0bedb7bf2287fe7c04caf6c6b266cf52a2 Parents: 540a66b Author: Andy Taylor <andy.tayl...@gmail.com> Authored: Tue May 19 11:33:13 2015 +0100 Committer: Andy Taylor <andy.tayl...@gmail.com> Committed: Thu May 21 09:15:14 2015 +0100 ---------------------------------------------------------------------- .../activemq/ra/ActiveMQActivationSpec.java | 60 +++ .../ra/ActiveMQConnectionRequestInfo.java | 89 +++- .../activemq/ra/ActiveMQConnectionSupport.java | 48 +- .../activemq/ra/ActiveMQResourceAdapter.java | 2 +- .../activemq/ra/MessageActivationSpec.java | 10 + .../ra/SSLMAnagedConnectionFactoryTest.java | 95 ++++ .../java/org/apache/activemq/ra/SSLTest.java | 498 +++++++++++++++++++ activemq-ra/src/test/resources/client.keystore | Bin 0 -> 2197 bytes activemq-ra/src/test/resources/server.keystore | Bin 0 -> 2197 bytes activemq-rar/src/main/rar/META-INF/ra.xml | 25 + 10 files changed, 820 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java index 0c1440b..99d4d9c 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java @@ -73,6 +73,11 @@ public class ActiveMQActivationSpec implements MessageActivationSpec, Serializab private String maxMessagesPerSessions = "10"; private String enableBatch = "false"; private String maxMessagesPerBatch = "10"; + private String trustStore; + private String trustStorePassword; + private String keyStore; + private String keyStorePassword; + private String keyStoreKeyPassword; private RedeliveryPolicy redeliveryPolicy; private boolean useJndi; @@ -675,4 +680,59 @@ public class ActiveMQActivationSpec implements MessageActivationSpec, Serializab public boolean isUseJndi() { return useJndi; } + + public String getTrustStore() { + if (!isEmpty(trustStore)) { + return trustStore; + } + return null; + } + + public void setTrustStore(String trustStore) { + this.trustStore = trustStore; + } + + public String getTrustStorePassword() { + if (!isEmpty(trustStorePassword)) { + return trustStorePassword; + } + return null; + } + + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } + + public String getKeyStore() { + if (!isEmpty(keyStore)) { + return keyStore; + } + return null; + } + + public void setKeyStore(String keyStore) { + this.keyStore = keyStore; + } + + public String getKeyStorePassword() { + if (!isEmpty(keyStorePassword)) { + return keyStorePassword; + } + return null; + } + + public void setKeyStorePassword(String keyStorePassword) { + this.keyStorePassword = keyStorePassword; + } + + public String getKeyStoreKeyPassword() { + if (!isEmpty(keyStoreKeyPassword)) { + return keyStoreKeyPassword; + } + return null; + } + + public void setKeyStoreKeyPassword(String keyStoreKeyPassword) { + this.keyStoreKeyPassword = keyStoreKeyPassword; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java index 0c96c6d..7c94721 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java @@ -22,7 +22,10 @@ import javax.resource.spi.ConnectionRequestInfo; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.ActiveMQSslConnectionFactory; import org.apache.activemq.RedeliveryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Must override equals and hashCode (JCA spec 16.4) @@ -30,6 +33,7 @@ import org.apache.activemq.RedeliveryPolicy; public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Serializable, Cloneable { private static final long serialVersionUID = -5754338187296859149L; + protected Logger log = LoggerFactory.getLogger(getClass()); private String userName; private String password; @@ -39,6 +43,11 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser private RedeliveryPolicy redeliveryPolicy; private ActiveMQPrefetchPolicy prefetchPolicy; private Boolean useSessionArgs; + private String trustStore; + private String trustStorePassword; + private String keyStore; + private String keyStorePassword; + private String keyStoreKeyPassword; public ActiveMQConnectionRequestInfo copy() { try { @@ -63,7 +72,7 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser /** * Configures the given connection factory */ - public void configure(ActiveMQConnectionFactory factory) { + public void configure(ActiveMQConnectionFactory factory, MessageActivationSpec activationSpec) { if (serverUrl != null) { factory.setBrokerURL(serverUrl); } @@ -76,6 +85,37 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser if (prefetchPolicy != null) { factory.setPrefetchPolicy(prefetchPolicy); } + if (factory instanceof ActiveMQSslConnectionFactory) { + String trustStore = defaultValue(activationSpec == null ? null : activationSpec.getTrustStore(), getTrustStore()); + String trustStorePassword = defaultValue(activationSpec == null ? null : activationSpec.getTrustStorePassword(), getTrustStorePassword()); + String keyStore = defaultValue(activationSpec == null ? null : activationSpec.getKeyStore(), getKeyStore()); + String keyStorePassword = defaultValue(activationSpec == null ? null : activationSpec.getKeyStorePassword(), getKeyStorePassword()); + String keyStoreKeyPassword = defaultValue(activationSpec == null ? null : activationSpec.getKeyStoreKeyPassword(), getKeyStoreKeyPassword()); + ActiveMQSslConnectionFactory sslFactory = (ActiveMQSslConnectionFactory) factory; + if (trustStore != null) { + try { + sslFactory.setTrustStore(trustStore); + } catch (Exception e) { + log.warn("Unable to set TrustStore", e); + } + } + if (trustStorePassword != null) { + sslFactory.setTrustStorePassword(trustStorePassword); + } + if (keyStore != null) { + try { + sslFactory.setKeyStore(keyStore); + } catch (Exception e) { + log.warn("Unable to set KeyStore", e); + } + } + if (keyStorePassword != null) { + sslFactory.setKeyStorePassword(keyStorePassword); + } + if (keyStoreKeyPassword != null) { + sslFactory.setKeyStoreKeyPassword(keyStoreKeyPassword); + } + } } /** @@ -182,6 +222,46 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser this.clientid = clientid; } + public String getTrustStore() { + return trustStore; + } + + public void setTrustStore(String trustStore) { + this.trustStore = trustStore; + } + + public String getTrustStorePassword() { + return trustStorePassword; + } + + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } + + public String getKeyStore() { + return keyStore; + } + + public void setKeyStore(String keyStore) { + this.keyStore = keyStore; + } + + public String getKeyStorePassword() { + return keyStorePassword; + } + + public void setKeyStorePassword(String keyStorePassword) { + this.keyStorePassword = keyStorePassword; + } + + public String getKeyStoreKeyPassword() { + return keyStoreKeyPassword; + } + + public void setKeyStoreKeyPassword(String keyStoreKeyPassword) { + this.keyStoreKeyPassword = keyStoreKeyPassword; + } + @Override public String toString() { return new StringBuffer("ActiveMQConnectionRequestInfo{ userName = '").append(userName).append("' ") @@ -354,4 +434,11 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser public void setUseSessionArgs(Boolean useSessionArgs) { this.useSessionArgs = useSessionArgs; } + + protected String defaultValue(String value, String defaultValue) { + if (value != null) { + return value; + } + return defaultValue; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionSupport.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionSupport.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionSupport.java index 2ca98d7..8d7c9fd 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionSupport.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionSupport.java @@ -19,6 +19,7 @@ package org.apache.activemq.ra; import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSslConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,13 +39,15 @@ public class ActiveMQConnectionSupport { * broker. The factory is configured with the given configuration information. * * @param connectionRequestInfo the configuration request information + * @param activationSpec * @return the connection factory * @throws java.lang.IllegalArgumentException if the server URL given in the * configuration information is not a valid URL */ - protected ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo connectionRequestInfo) { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); - connectionRequestInfo.configure(factory); + protected ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo connectionRequestInfo, MessageActivationSpec activationSpec) { + //ActiveMQSslConnectionFactory defaults to TCP anyway + ActiveMQConnectionFactory factory = new ActiveMQSslConnectionFactory(); + connectionRequestInfo.configure(factory, activationSpec); return factory; } @@ -57,8 +60,8 @@ public class ActiveMQConnectionSupport { * @return the physical connection * @throws JMSException if the connection could not be established */ - public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo connectionRequestInfo) throws JMSException{ - return makeConnection(connectionRequestInfo, createConnectionFactory(connectionRequestInfo)); + public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo connectionRequestInfo) throws JMSException { + return makeConnection(connectionRequestInfo, createConnectionFactory(connectionRequestInfo, null)); } /** @@ -187,6 +190,41 @@ public class ActiveMQConnectionSupport { info.setServerUrl(url); } + public void setTrustStore(String trustStore) { + if ( log.isDebugEnabled() ) { + log.debug(this + ", setting [trustStore] to: " + trustStore); + } + info.setTrustStore(trustStore); + } + + public void setTrustStorePassword(String trustStorePassword) { + if ( log.isDebugEnabled() ) { + log.debug(this + ", setting [trustStorePassword] to: " + trustStorePassword); + } + info.setTrustStorePassword(trustStorePassword); + } + + public void setKeyStore(String keyStore) { + if ( log.isDebugEnabled() ) { + log.debug(this + ", setting [keyStore] to: " + keyStore); + } + info.setKeyStore(keyStore); + } + + public void setKeyStorePassword(String keyStorePassword) { + if ( log.isDebugEnabled() ) { + log.debug(this + ", setting [keyStorePassword] to: " + keyStorePassword); + } + info.setKeyStorePassword(keyStorePassword); + } + + public void setKeyStoreKeyPassword(String keyStoreKeyPassword) { + if ( log.isDebugEnabled() ) { + log.debug(this + ", setting [keyStoreKeyPassword] to: " + keyStoreKeyPassword); + } + info.setKeyStoreKeyPassword(keyStoreKeyPassword); + } + /** * @return user name */ http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index 855ca43..9ae948a 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -121,7 +121,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { ActiveMQConnectionFactory cf = getConnectionFactory(); if (cf == null) { - cf = createConnectionFactory(getInfo()); + cf = createConnectionFactory(getInfo(), activationSpec); } String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName()); String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword()); http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java index 04327d6..1b82747 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/MessageActivationSpec.java @@ -131,4 +131,14 @@ public interface MessageActivationSpec extends ActivationSpec { boolean isUseJndi(); + String getTrustStore(); + + String getTrustStorePassword(); + + String getKeyStore(); + + String getKeyStorePassword(); + + String getKeyStoreKeyPassword(); + } http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/test/java/org/apache/activemq/ra/SSLMAnagedConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/SSLMAnagedConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/SSLMAnagedConnectionFactoryTest.java new file mode 100644 index 0000000..e4d6da4 --- /dev/null +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/SSLMAnagedConnectionFactoryTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.ra; + +import junit.framework.TestCase; +import org.apache.activemq.broker.SslBrokerService; +import org.apache.activemq.broker.SslContext; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.tcp.SslTransportFactory; + +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.net.ssl.KeyManager; +import javax.net.ssl.TrustManager; + +/** + * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> + */ +public class SSLMAnagedConnectionFactoryTest extends TestCase { + + private static final String DEFAULT_HOST = "ssl://0.0.0.0:61616"; + private ConnectionManagerAdapter connectionManager = new ConnectionManagerAdapter(); + private ActiveMQManagedConnectionFactory managedConnectionFactory; + private ConnectionFactory connectionFactory; + private ManagedConnectionProxy connection; + private ActiveMQManagedConnection managedConnection; + private SslBrokerService broker; + private TransportConnector connector; + + /** + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + managedConnectionFactory = new ActiveMQManagedConnectionFactory(); + managedConnectionFactory.setServerUrl(DEFAULT_HOST); + managedConnectionFactory.setTrustStore("server.keystore"); + managedConnectionFactory.setTrustStorePassword("password"); + managedConnectionFactory.setKeyStore("client.keystore"); + managedConnectionFactory.setKeyStorePassword("password"); + + connectionFactory = (ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager);createAndStartBroker(); + } + + @Override + protected void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + public void testSSLManagedConnection() throws Exception { + connection = (ManagedConnectionProxy)connectionFactory.createConnection(); + managedConnection = connection.getManagedConnection(); + //do some stuff + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue t = session.createQueue("TEST"); + MessageProducer producer = session.createProducer(t); + producer.send(session.createTextMessage("test message.")); + managedConnection.destroy(); + connection.close(); + } + + private void createAndStartBroker() throws Exception { + broker = new SslBrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(false); + broker.setBrokerName("BROKER"); + KeyManager[] km = SSLTest.getKeyManager(); + TrustManager[] tm = SSLTest.getTrustManager(); + connector = broker.addSslConnector(DEFAULT_HOST, km, tm, null); + broker.start(); + broker.waitUntilStarted(); // for client side + SslTransportFactory sslFactory = new SslTransportFactory(); + SslContext ctx = new SslContext(km, tm, null); + SslContext.setCurrentSslContext(ctx); + TransportFactory.registerTransportFactory("ssl", sslFactory); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/test/java/org/apache/activemq/ra/SSLTest.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/SSLTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/SSLTest.java new file mode 100644 index 0000000..042c09e --- /dev/null +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/SSLTest.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.ra; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQSslConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.SslBrokerService; +import org.apache.activemq.broker.SslContext; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.tcp.SslTransportFactory; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.resource.ResourceException; +import javax.resource.spi.BootstrapContext; +import javax.resource.spi.UnavailableException; +import javax.resource.spi.XATerminator; +import javax.resource.spi.endpoint.MessageEndpoint; +import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.resource.spi.work.ExecutionContext; +import javax.resource.spi.work.Work; +import javax.resource.spi.work.WorkException; +import javax.resource.spi.work.WorkListener; +import javax.resource.spi.work.WorkManager; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.security.KeyStore; +import java.util.Timer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class SSLTest extends TestCase { + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + + long txGenerator = System.currentTimeMillis(); + + private static final String BIND_ADDRESS = "ssl://0.0.0.0:61616"; + + private SslBrokerService broker; + + private TransportConnector connector; + + @Override + protected void setUp() throws Exception { + createAndStartBroker(); + } + + @Override + protected void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + private void createAndStartBroker() throws Exception { + broker = new SslBrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(false); + broker.setBrokerName("BROKER"); + KeyManager[] km = getKeyManager(); + TrustManager[] tm = getTrustManager(); + connector = broker.addSslConnector(BIND_ADDRESS, km, tm, null); + broker.start(); + broker.waitUntilStarted(); // for client side + SslTransportFactory sslFactory = new SslTransportFactory(); + SslContext ctx = new SslContext(km, tm, null); + SslContext.setCurrentSslContext(ctx); + TransportFactory.registerTransportFactory("ssl", sslFactory); + } + + private static final class StubBootstrapContext implements BootstrapContext { + public WorkManager getWorkManager() { + return new WorkManager() { + public void doWork(Work work) throws WorkException { + new Thread(work).start(); + } + + public void doWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException { + new Thread(work).start(); + } + + public long startWork(Work work) throws WorkException { + new Thread(work).start(); + return 0; + } + + public long startWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException { + new Thread(work).start(); + return 0; + } + + public void scheduleWork(Work work) throws WorkException { + new Thread(work).start(); + } + + public void scheduleWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException { + new Thread(work).start(); + } + }; + } + + public XATerminator getXATerminator() { + return null; + } + + public Timer createTimer() throws UnavailableException { + return null; + } + } + + public class StubMessageEndpoint implements MessageEndpoint, MessageListener { + public int messageCount; + public XAResource xaresource; + public Xid xid; + + public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException { + try { + if (xid == null) { + xid = createXid(); + } + xaresource.start(xid, 0); + } catch (Throwable e) { + throw new ResourceException(e); + } + } + + public void afterDelivery() throws ResourceException { + try { + xaresource.end(xid, 0); + xaresource.prepare(xid); + xaresource.commit(xid, false); + } catch (Throwable e) { + throw new ResourceException(e); + } + } + + public void release() { + } + + public void onMessage(Message message) { + messageCount++; + } + + } + + public void testMessageDeliveryUsingSSLTruststoreOnly() throws Exception { + SSLContext context = SSLContext.getInstance("TLS"); + context.init(getKeyManager(), getTrustManager(), null); + makeSSLConnection(context, null, connector); + ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616"); + factory.setTrustStore("server.keystore"); + factory.setTrustStorePassword("password"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer advisory = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQQueue("TEST"))); + + ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); + adapter.setServerUrl("ssl://localhost:61616"); + adapter.setTrustStore("server.keystore"); + adapter.setTrustStorePassword("password"); + adapter.setQueuePrefetch(1); + adapter.start(new StubBootstrapContext()); + + final CountDownLatch messageDelivered = new CountDownLatch(1); + + final StubMessageEndpoint endpoint = new StubMessageEndpoint() { + public void onMessage(Message message) { + super.onMessage(message); + messageDelivered.countDown(); + } + + ; + }; + + ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); + activationSpec.setDestinationType(Queue.class.getName()); + activationSpec.setDestination("TEST"); + activationSpec.setResourceAdapter(adapter); + activationSpec.validate(); + + MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() { + public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { + endpoint.xaresource = resource; + return endpoint; + } + + public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { + return true; + } + }; + + // Activate an Endpoint + adapter.endpointActivation(messageEndpointFactory, activationSpec); + + ActiveMQMessage msg = (ActiveMQMessage) advisory.receive(1000); + if (msg != null) { + assertEquals("Prefetch size hasn't been set", 1, ((ConsumerInfo) msg.getDataStructure()).getPrefetchSize()); + } else { + fail("Consumer hasn't been created"); + } + + // Send the broker a message to that endpoint + MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); + producer.send(session.createTextMessage("Hello!")); + + connection.close(); + + // Wait for the message to be delivered. + assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS)); + + // Shut the Endpoint down. + adapter.endpointDeactivation(messageEndpointFactory, activationSpec); + adapter.stop(); + + } + + public void testMessageDeliveryUsingSSLTruststoreAndKeystore() throws Exception { + SSLContext context = SSLContext.getInstance("TLS"); + context.init(getKeyManager(), getTrustManager(), null); + makeSSLConnection(context, null, connector); + ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616"); + factory.setTrustStore("server.keystore"); + factory.setTrustStorePassword("password"); + factory.setKeyStore("client.keystore"); + factory.setKeyStorePassword("password"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer advisory = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQQueue("TEST"))); + + ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); + adapter.setServerUrl("ssl://localhost:61616"); + adapter.setTrustStore("server.keystore"); + adapter.setTrustStorePassword("password"); + adapter.setKeyStore("client.keystore"); + adapter.setKeyStorePassword("password"); + adapter.setQueuePrefetch(1); + adapter.start(new StubBootstrapContext()); + + final CountDownLatch messageDelivered = new CountDownLatch(1); + + final StubMessageEndpoint endpoint = new StubMessageEndpoint() { + public void onMessage(Message message) { + super.onMessage(message); + messageDelivered.countDown(); + } + + ; + }; + + ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); + activationSpec.setDestinationType(Queue.class.getName()); + activationSpec.setDestination("TEST"); + activationSpec.setResourceAdapter(adapter); + activationSpec.validate(); + + MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() { + public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { + endpoint.xaresource = resource; + return endpoint; + } + + public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { + return true; + } + }; + + // Activate an Endpoint + adapter.endpointActivation(messageEndpointFactory, activationSpec); + + ActiveMQMessage msg = (ActiveMQMessage) advisory.receive(1000); + if (msg != null) { + assertEquals("Prefetch size hasn't been set", 1, ((ConsumerInfo) msg.getDataStructure()).getPrefetchSize()); + } else { + fail("Consumer hasn't been created"); + } + + // Send the broker a message to that endpoint + MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); + producer.send(session.createTextMessage("Hello!")); + + connection.close(); + + // Wait for the message to be delivered. + assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS)); + + // Shut the Endpoint down. + adapter.endpointDeactivation(messageEndpointFactory, activationSpec); + adapter.stop(); + + } + + public void testMessageDeliveryUsingSSLTruststoreAndKeystoreOverrides() throws Exception { + SSLContext context = SSLContext.getInstance("TLS"); + context.init(getKeyManager(), getTrustManager(), null); + makeSSLConnection(context, null, connector); + ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616"); + factory.setTrustStore("server.keystore"); + factory.setTrustStorePassword("password"); + factory.setKeyStore("client.keystore"); + factory.setKeyStorePassword("password"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer advisory = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQQueue("TEST"))); + + ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); + adapter.setServerUrl("ssl://localhost:61616"); + adapter.setQueuePrefetch(1); + adapter.start(new StubBootstrapContext()); + + final CountDownLatch messageDelivered = new CountDownLatch(1); + + final StubMessageEndpoint endpoint = new StubMessageEndpoint() { + public void onMessage(Message message) { + super.onMessage(message); + messageDelivered.countDown(); + } + + ; + }; + + ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); + activationSpec.setDestinationType(Queue.class.getName()); + activationSpec.setDestination("TEST"); + activationSpec.setResourceAdapter(adapter); + activationSpec.setTrustStore("server.keystore"); + activationSpec.setTrustStorePassword("password"); + activationSpec.setKeyStore("client.keystore"); + activationSpec.setKeyStorePassword("password"); + activationSpec.validate(); + + MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() { + public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { + endpoint.xaresource = resource; + return endpoint; + } + + public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { + return true; + } + }; + + // Activate an Endpoint + adapter.endpointActivation(messageEndpointFactory, activationSpec); + + ActiveMQMessage msg = (ActiveMQMessage) advisory.receive(1000); + if (msg != null) { + assertEquals("Prefetch size hasn't been set", 1, ((ConsumerInfo) msg.getDataStructure()).getPrefetchSize()); + } else { + fail("Consumer hasn't been created"); + } + + // Send the broker a message to that endpoint + MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); + producer.send(session.createTextMessage("Hello!")); + + connection.close(); + + // Wait for the message to be delivered. + assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS)); + + // Shut the Endpoint down. + adapter.endpointDeactivation(messageEndpointFactory, activationSpec); + adapter.stop(); + + } + + + public Xid createXid() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + + } + + public static TrustManager[] getTrustManager() throws Exception { + TrustManager[] trustStoreManagers = null; + KeyStore trustedCertStore = KeyStore.getInstance(KEYSTORE_TYPE); + + trustedCertStore.load(new FileInputStream(TRUST_KEYSTORE), null); + TrustManagerFactory tmf = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + + tmf.init(trustedCertStore); + trustStoreManagers = tmf.getTrustManagers(); + return trustStoreManagers; + } + + public static KeyManager[] getKeyManager() throws Exception { + KeyManagerFactory kmf = + KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + KeyStore ks = KeyStore.getInstance(KEYSTORE_TYPE); + KeyManager[] keystoreManagers = null; + + byte[] sslCert = loadClientCredential(SERVER_KEYSTORE); + + + if (sslCert != null && sslCert.length > 0) { + ByteArrayInputStream bin = new ByteArrayInputStream(sslCert); + ks.load(bin, PASSWORD.toCharArray()); + kmf.init(ks, PASSWORD.toCharArray()); + keystoreManagers = kmf.getKeyManagers(); + } + return keystoreManagers; + } + + private static byte[] loadClientCredential(String fileName) throws IOException { + if (fileName == null) { + return null; + } + FileInputStream in = new FileInputStream(fileName); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buf = new byte[512]; + int i = in.read(buf); + while (i > 0) { + out.write(buf, 0, i); + i = in.read(buf); + } + in.close(); + return out.toByteArray(); + } + + private void makeSSLConnection(SSLContext context, String enabledSuites[], TransportConnector connector) throws Exception, + UnknownHostException, SocketException { + SSLSocket sslSocket = (SSLSocket) context.getSocketFactory().createSocket("localhost", connector.getUri().getPort()); + + if (enabledSuites != null) { + sslSocket.setEnabledCipherSuites(enabledSuites); + } + sslSocket.setSoTimeout(5000); + + SSLSession session = sslSocket.getSession(); + sslSocket.startHandshake(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/test/resources/client.keystore ---------------------------------------------------------------------- diff --git a/activemq-ra/src/test/resources/client.keystore b/activemq-ra/src/test/resources/client.keystore new file mode 100755 index 0000000..a96e55c Binary files /dev/null and b/activemq-ra/src/test/resources/client.keystore differ http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-ra/src/test/resources/server.keystore ---------------------------------------------------------------------- diff --git a/activemq-ra/src/test/resources/server.keystore b/activemq-ra/src/test/resources/server.keystore new file mode 100755 index 0000000..0d549fc Binary files /dev/null and b/activemq-ra/src/test/resources/server.keystore differ http://git-wip-us.apache.org/repos/asf/activemq/blob/9becfc0b/activemq-rar/src/main/rar/META-INF/ra.xml ---------------------------------------------------------------------- diff --git a/activemq-rar/src/main/rar/META-INF/ra.xml b/activemq-rar/src/main/rar/META-INF/ra.xml index af0279e..2ff6657 100644 --- a/activemq-rar/src/main/rar/META-INF/ra.xml +++ b/activemq-rar/src/main/rar/META-INF/ra.xml @@ -80,6 +80,31 @@ <config-property-type>java.lang.Boolean</config-property-type> <config-property-value>false</config-property-value> </config-property> + <config-property> + <description>The location of a Trust Store to use with the Connection Factory</description> + <config-property-name>TrustStore</config-property-name> + <config-property-type>java.lang.String</config-property-type> + </config-property> + <config-property> + <description>The password for the Trust Store</description> + <config-property-name>TrustStorePassword</config-property-name> + <config-property-type>java.lang.String</config-property-type> + </config-property> + <config-property> + <description>The location of a Key Store to use with the Connection Factory</description> + <config-property-name>KeyStore</config-property-name> + <config-property-type>java.lang.String</config-property-type> + </config-property> + <config-property> + <description>The password for the Key Store</description> + <config-property-name>KeyStorePassword</config-property-name> + <config-property-type>java.lang.String</config-property-type> + </config-property> + <config-property> + <description>The Key password for the Key Store</description> + <config-property-name>KeyStoreKeyPassword</config-property-name> + <config-property-type>java.lang.String</config-property-type> + </config-property> <!-- NOTE disable the following property if you do not wish to deploy an embedded broker --> <config-property>