Repository: activemq Updated Branches: refs/heads/master d8c0ff141 -> 340728f2d
AMQ-5920 - use implicit broker transaction for virtual topic fanout and add concurrentSend=true option to use an executor to fanout. Combination gives a 3x reduction in fanout roundtrip for small persistent messages to 100 consumer queues Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/340728f2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/340728f2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/340728f2 Branch: refs/heads/master Commit: 340728f2d1eb4c1aa74f4002c5b16bf7a70c57b7 Parents: d8c0ff1 Author: gtully <[email protected]> Authored: Thu Aug 6 11:55:45 2015 +0100 Committer: gtully <[email protected]> Committed: Thu Aug 6 11:56:47 2015 +0100 ---------------------------------------------------------------------- .../SelectorAwareVirtualTopicInterceptor.java | 17 +-- .../broker/region/virtual/VirtualTopic.java | 17 ++- .../region/virtual/VirtualTopicInterceptor.java | 95 ++++++++++++++++- .../activemq/transaction/Transaction.java | 4 +- .../VirtualTopicDisconnectSelectorTest.java | 10 +- .../virtual/VirtualTopicFanoutPerfTest.java | 103 +++++++++++++++++++ 6 files changed, 224 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java index 0c19565..b528f40 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java @@ -40,8 +40,8 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>(); private SubQueueSelectorCacheBroker selectorCachePlugin; - public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { - super(next, prefix, postfix, local); + public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { + super(next, virtualTopic); } /** @@ -49,18 +49,7 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto * the virtual queues, hence there is no build up of unmatched messages on these destinations */ @Override - protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception { - Broker broker = context.getConnectionContext().getBroker(); - Set<Destination> destinations = broker.getDestinations(destination); - - for (Destination dest : destinations) { - if (matchesSomeConsumer(broker, message, dest)) { - dest.send(context, message.copy()); - } - } - } - - private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException { + protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException { boolean matches = false; MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(dest.getActiveMQDestination()); http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index 769c784..95fa333 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -42,6 +42,7 @@ public class VirtualTopic implements VirtualDestination { private String name = ">"; private boolean selectorAware = false; private boolean local = false; + private boolean concurrentSend = false; @Override public ActiveMQDestination getVirtualDestination() { @@ -50,8 +51,8 @@ public class VirtualTopic implements VirtualDestination { @Override public Destination intercept(Destination destination) { - return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : new VirtualTopicInterceptor( - destination, getPrefix(), getPostfix(), isLocal()); + return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, this) : + new VirtualTopicInterceptor(destination, this); } @Override @@ -168,4 +169,16 @@ public class VirtualTopic implements VirtualDestination { append(postfix).append(',').append(selectorAware). append(',').append(local).toString(); } + + public boolean isConcurrentSend() { + return concurrentSend; + } + + /** + * When true, dispatch to matching destinations in parallel (in multiple threads) + * @param concurrentSend + */ + public void setConcurrentSend(boolean concurrentSend) { + this.concurrentSend = concurrentSend; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index 21c1d23..7967562 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -16,12 +16,21 @@ */ package org.apache.activemq.broker.region.virtual; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.Message; import org.apache.activemq.util.LRUCache; @@ -33,13 +42,15 @@ public class VirtualTopicInterceptor extends DestinationFilter { private final String prefix; private final String postfix; private final boolean local; + private final boolean concurrentSend; private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>(); - public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { + public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { super(next); - this.prefix = prefix; - this.postfix = postfix; - this.local = local; + this.prefix = virtualTopic.getPrefix(); + this.postfix = virtualTopic.getPostfix(); + this.local = virtualTopic.isLocal(); + this.concurrentSend = virtualTopic.isConcurrentSend(); } public Topic getTopic() { @@ -55,6 +66,82 @@ public class VirtualTopicInterceptor extends DestinationFilter { super.send(context, message); } + @Override + protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception { + final Broker broker = context.getConnectionContext().getBroker(); + final Set<Destination> destinations = broker.getDestinations(destination); + final int numDestinations = destinations.size(); + + final LocalTransactionId localBrokerTransactionToCoalesceJournalSync = + beginLocalTransaction(numDestinations, context.getConnectionContext(), message); + try { + if (concurrentSend && numDestinations > 1) { + + final CountDownLatch concurrent = new CountDownLatch(destinations.size()); + final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>(); + final BrokerService brokerService = broker.getBrokerService(); + + for (final Destination dest : destinations) { + if (shouldDispatch(broker, message, dest)) { + brokerService.getTaskRunnerFactory().execute(new Runnable() { + @Override + public void run() { + try { + if (exceptionAtomicReference.get() == null) { + dest.send(context, message.copy()); + } + } catch (Exception e) { + exceptionAtomicReference.set(e); + } finally { + concurrent.countDown(); + } + } + }); + } else { + concurrent.countDown(); + } + } + concurrent.await(); + if (exceptionAtomicReference.get() != null) { + throw exceptionAtomicReference.get(); + } + + } else { + for (final Destination dest : destinations) { + if (shouldDispatch(broker, message, dest)) { + dest.send(context, message.copy()); + } + } + } + } finally { + commit(localBrokerTransactionToCoalesceJournalSync, context.getConnectionContext(), message); + } + } + + private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception { + LocalTransactionId result = null; + if (numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) { + result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId()); + connectionContext.getBroker().beginTransaction(connectionContext, result); + connectionContext.setTransaction(connectionContext.getTransactions().get(result)); + message.setTransactionId(result); + } + return result; + } + + private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception { + if (tx != null) { + connectionContext.getBroker().commitTransaction(connectionContext, tx, true); + connectionContext.getTransactions().remove(tx); + connectionContext.setTransaction(null); + message.setTransactionId(null); + } + } + + protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException { + return true; + } + protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) { ActiveMQQueue queue; synchronized (cache) { http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java index 99758d1..6843871 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java @@ -74,7 +74,9 @@ public abstract class Transaction { } public void addSynchronization(Synchronization r) { - synchronizations.add(r); + synchronized (synchronizations) { + synchronizations.add(r); + } if (state == START_STATE) { state = IN_USE_STATE; } http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java index 8b95345..cf2c67e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java @@ -31,6 +31,7 @@ import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.plugin.SubQueueSelectorCacheBroker; import org.apache.activemq.spring.ConsumerBean; import org.apache.activemq.xbean.XBeanBrokerFactory; import org.slf4j.Logger; @@ -107,7 +108,7 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor assertMessagesArrived(messageList, expected ,10000); } - protected Destination getConsumerDsetination() { + protected ActiveMQQueue getConsumerDsetination() { return new ActiveMQQueue("Consumer.VirtualTopic.TEST"); } @@ -182,4 +183,11 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor return answer; } + + protected void startBroker() throws Exception { + super.startBroker(); + // start with a clean slate + SubQueueSelectorCacheBroker selectorCacheBroker = (SubQueueSelectorCacheBroker) broker.getBroker().getAdaptor(SubQueueSelectorCacheBroker.class); + selectorCacheBroker.deleteAllSelectorsForDestination(getConsumerDsetination().getQualifiedName()); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/340728f2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java new file mode 100644 index 0000000..90cdeea --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VirtualTopicFanoutPerfTest { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicFanoutPerfTest.class); + + int numConsumers = 100; + int total = 500; + BrokerService brokerService; + ConnectionFactory connectionFactory; + + @Before + public void createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.start(); + + for (DestinationInterceptor destinationInterceptor : brokerService.getDestinationInterceptors()) { + for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) { + if (virtualDestination instanceof VirtualTopic) { + ((VirtualTopic) virtualDestination).setConcurrentSend(true); + } + } + } + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy(); + zeroPrefetch.setAll(0); + activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch); + connectionFactory = activeMQConnectionFactory; + } + + @After + public void stopBroker() throws Exception { + brokerService.stop(); + } + + @Test + @Ignore("comparison test - concurrentSend=true virtual topic, use transaction") + public void testFanoutDuration() throws Exception { + + + Session session = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + for (int i=0; i<numConsumers; i++) { + session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST")); + } + + // create topic producer + Session producerSession = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(new ActiveMQTopic("VirtualTopic.TEST")); + + long start = System.currentTimeMillis(); + LOG.info("Starting producer: " + start); + for (int i = 0; i < total; i++) { + producer.send(producerSession.createTextMessage("message: " + i)); + } + LOG.info("Done producer, duration: " + (System.currentTimeMillis() - start) ); + + + } + + private Connection createStartAndTrackConnection() throws Exception { + Connection connection = connectionFactory.createConnection(); + connection.start(); + return connection; + } + +}
