Repository: activemq-artemis Updated Branches: refs/heads/master b9f00f73b -> cd47873f2
ARTEMIS-979 OpenWire "no-Local" consumer not working When creating a 'no-local' openwire consumer, it doesn't work, meaning it can still receive messages from the same connection. The fix is similar to what Artemis client does, which is adding a 'filter' to the consumer/subscription. The difference is that with OpenWire we have to do it on the broker side. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e7a4d42a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e7a4d42a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e7a4d42a Branch: refs/heads/master Commit: e7a4d42a6453973cf97525d355d91dc0c93d7937 Parents: b9f00f7 Author: Howard Gao <[email protected]> Authored: Sun Feb 19 23:21:20 2017 +0800 Committer: Clebert Suconic <[email protected]> Committed: Mon Feb 20 08:17:43 2017 -0500 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 10 ++ .../core/protocol/openwire/amq/AMQConsumer.java | 17 +++ .../core/protocol/openwire/amq/AMQSession.java | 6 + .../openwire/SimpleOpenWireTest.java | 137 +++++++++++++++++++ 4 files changed, 170 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a4d42a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 13b8a39..da32bda 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -156,6 +156,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private ConnectionState state; + private volatile boolean noLocal; + /** * Openwire doesn't sen transactions associated with any sessions. * It will however send beingTX / endTX as it would be doing it with XA Transactions. @@ -836,6 +838,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se disableTtl.set(false); } + public boolean isNoLocal() { + return noLocal; + } + + public void setNoLocal(boolean noLocal) { + this.noLocal = noLocal; + } + class SlowConsumerDetection implements SlowConsumerDetectionListener { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a4d42a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 77a1a4a..917d808 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; @@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -79,6 +81,18 @@ public class AMQConsumer { public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); + if (info.isNoLocal()) { + if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) { + //tell the connection to add the property + this.session.getConnection().setNoLocal(true); + } + String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.getId().getConnectionId() + "'"; + if (selector == null) { + selector = new SimpleString(noLocalSelector); + } else { + selector = new SimpleString(info.getSelector() + " AND " + noLocalSelector); + } + } String physicalName = session.convertWildcard(openwireDestination.getPhysicalName()); @@ -201,6 +215,9 @@ public class AMQConsumer { return 0; } + if (session.getConnection().isNoLocal()) { + message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); + } dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a4d42a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 7cdd070..e002fd0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.IDGenerator; @@ -297,6 +298,11 @@ public class AMQSession implements SessionCallback { ServerMessage originalCoreMsg = getConverter().inbound(messageSend); + if (connection.isNoLocal() || sessInfo.getSessionId().getValue() == -1) { + //Internal session is used to send advisory messages, which are always noLocal + originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getConnectionId().getValue()); + } + /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a4d42a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index b406fdd..96d8ac4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -31,6 +31,8 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSession; import javax.jms.XAConnection; import javax.jms.XASession; import javax.transaction.xa.XAResource; @@ -356,6 +358,141 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } @Test + public void testTopicNoLocal() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + System.out.println("creating queue: " + topicName); + Destination dest = new ActiveMQTopic(topicName); + + MessageConsumer nolocalConsumer = session.createConsumer(dest, null, true); + MessageConsumer consumer = session.createConsumer(dest, null, false); + MessageConsumer selectorConsumer = session.createConsumer(dest,"TESTKEY = 'test'", false); + + MessageProducer producer = session.createProducer(dest); + + final String body1 = "MfromAMQ-1"; + final String body2 = "MfromAMQ-2"; + TextMessage msg = session.createTextMessage(body1); + producer.send(msg); + + msg = session.createTextMessage(body2); + msg.setStringProperty("TESTKEY", "test"); + producer.send(msg); + + //receive nolocal + TextMessage receivedMsg = (TextMessage) nolocalConsumer.receive(1000); + assertNull("nolocal consumer got: " + receivedMsg, receivedMsg); + + //receive normal consumer + receivedMsg = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMsg); + assertEquals(body1, receivedMsg.getText()); + + receivedMsg = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMsg); + assertEquals(body2, receivedMsg.getText()); + + assertNull(consumer.receiveNoWait()); + + //selector should only receive one + receivedMsg = (TextMessage) selectorConsumer.receive(1000); + assertNotNull(receivedMsg); + assertEquals(body2, receivedMsg.getText()); + assertEquals("test", receivedMsg.getStringProperty("TESTKEY")); + + assertNull(selectorConsumer.receiveNoWait()); + + //send from another connection + Connection anotherConn = this.factory.createConnection(); + try { + anotherConn.start(); + + Session anotherSession = anotherConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer anotherProducer = anotherSession.createProducer(dest); + TextMessage anotherMsg = anotherSession.createTextMessage(body1); + anotherProducer.send(anotherMsg); + + assertNotNull(consumer.receive(1000)); + assertNull(selectorConsumer.receive(1000)); + assertNotNull(nolocalConsumer.receive(1000)); + } finally { + anotherConn.close(); + } + + session.close(); + } + + @Test + public void testTopicNoLocalDurable() throws Exception { + connection.setClientID("forNoLocal-1"); + connection.start(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + System.out.println("creating queue: " + topicName); + Topic dest = new ActiveMQTopic(topicName); + + MessageConsumer nolocalConsumer = session.createDurableSubscriber(dest, "nolocal-subscriber1", "", true); + MessageConsumer consumer = session.createDurableSubscriber(dest, "normal-subscriber", null, false); + MessageConsumer selectorConsumer = session.createDurableSubscriber(dest, "selector-subscriber", "TESTKEY = 'test'", false); + + MessageProducer producer = session.createProducer(dest); + + final String body1 = "MfromAMQ-1"; + final String body2 = "MfromAMQ-2"; + TextMessage msg = session.createTextMessage(body1); + producer.send(msg); + + msg = session.createTextMessage(body2); + msg.setStringProperty("TESTKEY", "test"); + producer.send(msg); + + //receive nolocal + TextMessage receivedMsg = (TextMessage) nolocalConsumer.receive(1000); + assertNull("nolocal consumer got: " + receivedMsg, receivedMsg); + + //receive normal consumer + receivedMsg = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMsg); + assertEquals(body1, receivedMsg.getText()); + + receivedMsg = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMsg); + assertEquals(body2, receivedMsg.getText()); + + assertNull(consumer.receiveNoWait()); + + //selector should only receive one + receivedMsg = (TextMessage) selectorConsumer.receive(1000); + assertNotNull(receivedMsg); + assertEquals(body2, receivedMsg.getText()); + assertEquals("test", receivedMsg.getStringProperty("TESTKEY")); + + assertNull(selectorConsumer.receiveNoWait()); + + //send from another connection + Connection anotherConn = this.factory.createConnection(); + try { + anotherConn.start(); + + Session anotherSession = anotherConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer anotherProducer = anotherSession.createProducer(dest); + TextMessage anotherMsg = anotherSession.createTextMessage(body1); + anotherProducer.send(anotherMsg); + + assertNotNull(consumer.receive(1000)); + assertNull(selectorConsumer.receive(1000)); + assertNotNull(nolocalConsumer.receive(1000)); + } finally { + anotherConn.close(); + } + + session.close(); + } + + @Test public void testSimpleTempTopic() throws Exception { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
