Updated Branches: refs/heads/trunk 74b35bc5d -> ebe54c46b
Fix and tests for: https://issues.apache.org/jira/browse/AMQ-4596 Updates to proton 0.5 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ebe54c46 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ebe54c46 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ebe54c46 Branch: refs/heads/trunk Commit: ebe54c46b3103b40b1428dd18df08fecc3d72386 Parents: 74b35bc Author: Timothy Bish <[email protected]> Authored: Wed Sep 4 10:26:44 2013 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Sep 4 10:26:44 2013 -0400 ---------------------------------------------------------------------- activemq-amqp/pom.xml | 5 + .../transport/amqp/AmqpProtocolConverter.java | 41 +++-- .../transport/amqp/SimpleAMQPAuthTest.java | 168 +++++++++++++++++++ .../transport/amqp/simple-auth-amqp-broker.xml | 85 ++++++++++ pom.xml | 2 +- 5 files changed, 284 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/activemq-amqp/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml index 4e7d65b..b5678a9 100644 --- a/activemq-amqp/pom.xml +++ b/activemq-amqp/pom.xml @@ -63,6 +63,11 @@ <artifactId>activemq-kahadb-store</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>activemq-jaas</artifactId> + <scope>test</scope> + </dependency> <!-- Joram JMS conformance tests --> <dependency> http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 20f7594..ff81fb7 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -72,19 +72,22 @@ import org.apache.qpid.proton.amqp.transaction.Declare; import org.apache.qpid.proton.amqp.transaction.Declared; import org.apache.qpid.proton.amqp.transaction.Discharge; import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.EndpointError; import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.ConnectionImpl; -import org.apache.qpid.proton.engine.impl.LinkImpl; +import org.apache.qpid.proton.engine.impl.EngineFactoryImpl; import org.apache.qpid.proton.engine.impl.ProtocolTracer; import org.apache.qpid.proton.engine.impl.TransportImpl; import org.apache.qpid.proton.framing.TransportFrame; @@ -120,8 +123,9 @@ class AmqpProtocolConverter { int prefetch = 100; ReentrantLock lock = new ReentrantLock(); - TransportImpl protonTransport = new TransportImpl(); - ConnectionImpl protonConnection = new ConnectionImpl(); + EngineFactory engineFactory = new EngineFactoryImpl(); + Transport protonTransport = engineFactory.createTransport(); + Connection protonConnection = engineFactory.createConnection(); public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) { this.amqpTransport = transport; @@ -131,7 +135,7 @@ class AmqpProtocolConverter { void updateTracer() { if (amqpTransport.isTrace()) { - this.protonTransport.setProtocolTracer(new ProtocolTracer() { + ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() { @Override public void receivedFrame(TransportFrame transportFrame) { if (TRACE_FRAMES.isTraceEnabled()) { @@ -415,9 +419,7 @@ class AmqpProtocolConverter { if (response.isException()) { Throwable exception = ((ExceptionResponse) response).getException(); - // TODO: figure out how to close /w an error. - // protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), - // exception.getMessage())); + protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage())); protonConnection.close(); pumpProtonToSocket(); amqpTransport.onException(IOExceptionSupport.create(exception)); @@ -729,7 +731,7 @@ class AmqpProtocolConverter { if (response.isException()) { receiver.setTarget(null); Throwable exception = ((ExceptionResponse) response).getException(); - ((LinkImpl) receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage())); + receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); receiver.close(); } else { receiver.open(); @@ -740,7 +742,7 @@ class AmqpProtocolConverter { } } catch (AmqpProtocolException exception) { receiver.setTarget(null); - ((LinkImpl) receiver).setLocalError(new EndpointError(exception.getSymbolicName(), exception.getMessage())); + receiver.setCondition(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage())); receiver.close(); } } @@ -840,6 +842,7 @@ class AmqpProtocolConverter { Buffer currentBuffer; Delivery currentDelivery; + final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; public void pumpOutbound() throws Exception { while (!closed) { @@ -868,6 +871,12 @@ class AmqpProtocolConverter { final MessageDispatch md = outbound.removeFirst(); try { + if (md.getMessage() != null) { + org.apache.activemq.command.Message message = md.getMessage(); + if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { + message.setProperty(MESSAGE_FORMAT_KEY, 0); + } + } final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage(); if (jms == null) { // It's the end of browse signal. @@ -1102,7 +1111,7 @@ class AmqpProtocolConverter { SelectorParser.parse(selector); } catch (InvalidSelectorException e) { sender.setSource(null); - ((LinkImpl) sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage())); + sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); sender.close(); consumerContext.closed = true; return; @@ -1133,7 +1142,7 @@ class AmqpProtocolConverter { sender.setSource(null); Throwable exception = ((ExceptionResponse) response).getException(); String name = exception.getClass().getName(); - ((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage())); + sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); } sender.open(); pumpProtonToSocket(); @@ -1185,11 +1194,11 @@ class AmqpProtocolConverter { if (response.isException()) { sender.setSource(null); Throwable exception = ((ExceptionResponse) response).getException(); - String name = exception.getClass().getName(); + Symbol condition = AmqpError.INTERNAL_ERROR; if (exception instanceof InvalidSelectorException) { - name = "amqp:invalid-field"; + condition = AmqpError.INVALID_FIELD; } - ((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage())); + sender.setCondition(new ErrorCondition(condition, exception.getMessage())); subscriptionsByConsumerId.remove(id); sender.close(); } else { @@ -1201,7 +1210,7 @@ class AmqpProtocolConverter { }); } catch (AmqpProtocolException e) { sender.setSource(null); - ((LinkImpl) sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage())); + sender.setCondition(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage())); sender.close(); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java new file mode 100644 index 0000000..f3bc037 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java @@ -0,0 +1,168 @@ +package org.apache.activemq.transport.amqp; +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.qpid.amqp_1_0.client.ConnectionClosedException; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.net.URI; + +import static org.junit.Assert.*; + +/** + * @author Kevin Earls + */ +public class SimpleAMQPAuthTest { + public static final String SIMPLE_AUTH_AMQP_BROKER_XML = "org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml"; + public BrokerService brokerService; + protected static final Logger LOG = LoggerFactory.getLogger(SimpleAMQPAuthTest.class); + protected int port = 5672; + + @Before + public void setUp() throws Exception { + startBroker(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService = null; + } + } + + @Test(timeout = 10000) + public void testNoUserOrPassword() throws Exception { + try { + ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "", ""); + Connection connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + LOG.error("Unexpected exception ", exception); + exception.printStackTrace(); + } + }); + connection.start(); + Thread.sleep(1000); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + fail("Expected JMSException"); + } catch (JMSException e) { + Exception linkedException = e.getLinkedException(); + if (linkedException != null && linkedException instanceof ConnectionClosedException) { + ConnectionClosedException cce = (ConnectionClosedException) linkedException; + assertEquals("Error{condition=unauthorized-access,description=User name [null] or password is invalid.}", cce.getRemoteError().toString()); + } else { + LOG.error("Unexpected Exception", e); + fail("Unexpected exception: " + e.getMessage()); + } + } + } + + @Test(timeout = 10000) + public void testUnknownUser() throws Exception { + try { + ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); + Connection connection = factory.createConnection("nosuchuser", "blah"); + connection.start(); + Thread.sleep(500); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + fail("Expected JMSException"); + } catch (JMSException e) { + Exception linkedException = e.getLinkedException(); + if (linkedException != null && linkedException instanceof ConnectionClosedException) { + ConnectionClosedException cce = (ConnectionClosedException) linkedException; + assertEquals("Error{condition=unauthorized-access,description=User name [nosuchuser] or password is invalid.}", cce.getRemoteError().toString()); + } else { + LOG.error("Unexpected Exception", e); + fail("Unexpected exception: " + e.getMessage()); + } + } + } + + @Test(timeout = 10000) + public void testKnownUserWrongPassword() throws Exception { + try { + ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); + Connection connection = factory.createConnection("user", "wrongPassword"); + connection.start(); + Thread.sleep(500); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + fail("Expected JMSException"); + } catch (JMSException e) { + Exception linkedException = e.getLinkedException(); + if (linkedException != null && linkedException instanceof ConnectionClosedException) { + ConnectionClosedException cce = (ConnectionClosedException) linkedException; + assertEquals("Error{condition=unauthorized-access,description=User name [user] or password is invalid.}", cce.getRemoteError().toString()); + } else { + LOG.error("Unexpected Exception", e); + fail("Unexpected exception: " + e.getMessage()); + } + } + } + + + @Test(timeout = 30000) + public void testSendReceive() throws Exception { + ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); + Connection connection = factory.createConnection("user", "userPassword"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueImpl queue = new QueueImpl("queue://txqueue"); + MessageProducer p = session.createProducer(queue); + TextMessage message = null; + message = session.createTextMessage(); + String messageText = "hello sent at " + new java.util.Date().toString(); + message.setText(messageText); + p.send(message); + + // Get the message we just sent + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + Message msg = consumer.receive(5000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + TextMessage textMessage = (TextMessage) msg; + assertEquals(messageText, textMessage.getText()); + connection.close(); + } + + + protected BrokerService createBroker() throws Exception { + return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML); + } + + protected BrokerService createBroker(String uri) throws Exception { + LOG.debug(">>>>> Loading broker configuration from the classpath with URI: " + uri); + return BrokerFactory.createBroker(new URI("xbean:" + uri)); + } + + public void startBroker() throws Exception { + brokerService = createBroker(); + brokerService.start(); + brokerService.waitUntilStarted(); + } + +} + http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml b/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml new file mode 100644 index 0000000..624c649 --- /dev/null +++ b/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml @@ -0,0 +1,85 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: example --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + + <broker useJmx="true" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true" schedulePeriodForDestinationPurge="2000"> + + <destinations> + <queue physicalName="TEST.Q" /> + </destinations> + + <!-- Use a non-default port in case the default port is in use --> + <managementContext> + <managementContext connectorPort="1199"/> + </managementContext> + + <transportConnectors> + <transportConnector name="openwire" uri="vm://localhost" /> + <transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/> + </transportConnectors> + + <plugins> + + <simpleAuthenticationPlugin> + <users> + <authenticationUser username="system" password="systemPassword" groups="users,admins"/> + <authenticationUser username="user" password="userPassword" groups="users"/> + <authenticationUser username="guest" password="guestPassword" groups="guests"/> + </users> + </simpleAuthenticationPlugin> + + + + <!-- lets configure a destination based authorization mechanism --> + <!-- + <authorizationPlugin> + <map> + <authorizationMap> + <authorizationEntries> + <authorizationEntry queue=">" read="admins" write="admins" admin="admins" /> + <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" /> + <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> + + <authorizationEntry queue="TEST.Q" read="guests" write="guests" /> + + <authorizationEntry topic=">" read="admins" write="admins" admin="admins" /> + <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" /> + <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> + + <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/> + </authorizationEntries> + <tempDestinationAuthorizationEntry> + <tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/> + </tempDestinationAuthorizationEntry> + </authorizationMap> + </map> + </authorizationPlugin> + --> + </plugins> + </broker> + +</beans> http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5e726fb..1564e9e 100755 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,7 @@ <p2psockets-version>1.1.2</p2psockets-version> <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version> <zookeeper-version>3.4.3</zookeeper-version> - <qpid-proton-version>0.3.0-fuse-4</qpid-proton-version> + <qpid-proton-version>0.5</qpid-proton-version> <qpid-jms-version>0.22</qpid-jms-version> <regexp-version>1.3</regexp-version> <rome-version>1.0</rome-version>
