Updated Branches: refs/heads/trunk e90ce1aab -> 468e69765
Fix for https://issues.apache.org/jira/browse/AMQ-4766 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/468e6976 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/468e6976 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/468e6976 Branch: refs/heads/trunk Commit: 468e69765145ddad199963260e4774d179ad5555 Parents: e000471 Author: rajdavies <[email protected]> Authored: Thu Oct 10 20:29:40 2013 +0100 Committer: rajdavies <[email protected]> Committed: Thu Oct 10 20:30:01 2013 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/jmx/QueueView.java | 38 +++++++++ .../activemq/broker/jmx/QueueViewMBean.java | 29 +++++++ .../apache/activemq/broker/region/Queue.java | 30 +------ .../region/group/CachedMessageGroupMap.java | 87 ++++++++++++++++++++ .../group/CachedMessageGroupMapFactory.java | 33 ++++++++ .../broker/region/group/GroupFactoryFinder.java | 39 +++++++++ .../region/group/MessageGroupHashBucket.java | 35 +++++++- .../broker/region/group/MessageGroupMap.java | 11 +++ .../region/group/SimpleMessageGroupMap.java | 21 +++++ .../region/group/SimpleMessageGroupSet.java | 4 + .../broker/region/policy/PolicyEntry.java | 20 ++++- .../services/org/apache/activemq/groups/bucket | 17 ++++ .../services/org/apache/activemq/groups/cached | 17 ++++ .../services/org/apache/activemq/groups/simple | 17 ++++ 14 files changed, 365 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java index 3a630c8..76d82a3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.jmx; +import java.util.Map; + import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; import javax.jms.JMSException; @@ -188,4 +190,40 @@ public class QueueView extends DestinationView implements QueueViewMBean { } return false; } + + /** + * @return a Map of groupNames and ConsumerIds + */ + @Override + public Map<String, String> getMessageGroups() { + Queue queue = (Queue) destination; + return queue.getMessageGroupOwners().getGroups(); + } + + /** + * @return the message group type implementation (simple,bucket,cached) + */ + @Override + public String getMessageGroupType() { + Queue queue = (Queue) destination; + return queue.getMessageGroupOwners().getType(); + } + + /** + * remove a message group = has the effect of rebalancing group + */ + @Override + public void removeMessageGroup(@MBeanInfo("groupName") String groupName) { + Queue queue = (Queue) destination; + queue.getMessageGroupOwners().removeGroup(groupName); + } + + /** + * remove all the message groups - will rebalance all message groups across consumers + */ + @Override + public void removeAllMessageGroups() { + Queue queue = (Queue) destination; + queue.getMessageGroupOwners().removeAll(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java index 22ce661..3f99162 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.jmx; +import java.util.Map; + import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; @@ -180,4 +182,31 @@ public interface QueueViewMBean extends DestinationViewMBean { */ @MBeanInfo("Caching is enabled") boolean isCacheEnabled(); + + + /** + * @return a Map of groupNames and ConsumerIds + */ + @MBeanInfo("Map of groupNames and ConsumerIds") + Map<String,String> getMessageGroups(); + + /** + * @return the message group type implementation (simple,bucket,cached) + */ + @MBeanInfo("group implementation (simple,bucket,cached)") + String getMessageGroupType(); + + /** + * remove a message group = has the effect of rebalancing group + * @param groupName + */ + + @MBeanInfo("remove a message group by its groupName") + void removeMessageGroup(@MBeanInfo("groupName")String groupName); + + /** + * remove all the message groups - will rebalance all message groups across consumers + */ + @MBeanInfo("emove all the message groups - will rebalance all message groups across consumers") + void removeAllMessageGroups(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 6f4e2fa..7713d71 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -17,18 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -46,7 +35,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.ResourceAllocationException; - import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -56,24 +44,14 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PrioritizedPendingList; import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; -import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; +import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.util.InsertionCountList; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ExceptionResponse; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatchNotification; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerAck; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.Response; +import org.apache.activemq.command.*; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; @@ -114,7 +92,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { protected PendingList redeliveredWaitingDispatch = new OrderedPendingList(); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); - private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); + private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory(); final Lock sendLock = new ReentrantLock(); private ExecutorService executor; private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>(); http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java new file mode 100644 index 0000000..7829ec4 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.memory.LRUMap; + +/** + * A simple implementation which tracks every individual GroupID value in a LRUCache + * + * + */ +public class CachedMessageGroupMap implements MessageGroupMap { + private LRUMap<String, ConsumerId> cache = new LRUMap<String, ConsumerId>(1024); + + public synchronized void put(String groupId, ConsumerId consumerId) { + cache.put(groupId, consumerId); + } + + public synchronized ConsumerId get(String groupId) { + return cache.get(groupId); + } + + public synchronized ConsumerId removeGroup(String groupId) { + return cache.remove(groupId); + } + + public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) { + SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet(); + Map<String,ConsumerId> map = new HashMap<String, ConsumerId>(); + map.putAll(cache); + for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) { + String group = iter.next(); + ConsumerId owner = map.get(group); + if (owner.equals(consumerId)) { + ownedGroups.add(group); + } + } + for (String group:ownedGroups.getUnderlyingSet()){ + cache.remove(group); + } + return ownedGroups; + } + + + @Override + public synchronized void removeAll(){ + cache.clear(); + } + + @Override + public synchronized Map<String, String> getGroups() { + Map<String,String> result = new HashMap<String,String>(); + for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){ + result.put(entry.getKey(),entry.getValue().toString()); + } + return result; + } + + @Override + public String getType() { + return "cached"; + } + + public String toString() { + return "message groups: " + cache.size(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMapFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMapFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMapFactory.java new file mode 100644 index 0000000..7387c5a --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMapFactory.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + +/** + * A factory to create instances of {@link org.apache.activemq.broker.region.group.SimpleMessageGroupMap} when implementing the + * <a href="http://activemq.apache.org/message-groups.html">Message Groups</a> functionality. + * + * @org.apache.xbean.XBean + * + * + */ +public class CachedMessageGroupMapFactory implements MessageGroupMapFactory { + + public MessageGroupMap createMessageGroupMap() { + return new CachedMessageGroupMap(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/GroupFactoryFinder.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/GroupFactoryFinder.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/GroupFactoryFinder.java new file mode 100644 index 0000000..168804f --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/GroupFactoryFinder.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + +import java.io.IOException; + +import org.apache.activemq.util.FactoryFinder; +import org.apache.activemq.util.IOExceptionSupport; + +public class GroupFactoryFinder { + private static final FactoryFinder GROUP_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/groups/"); + + private GroupFactoryFinder() { + } + + public static MessageGroupMapFactory createMessageGroupMapFactory(String type) throws IOException { + try { + return (MessageGroupMapFactory)GROUP_FACTORY_FINDER.newInstance(type); + } catch (Throwable e) { + throw IOExceptionSupport.create("Could not load " + type + " factory:" + e, e); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java index 523d350..c36f949 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.broker.region.group; +import java.util.Map; + import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.memory.LRUMap; /** * Uses hash-code buckets to associate consumers with sets of message group IDs. @@ -27,30 +30,37 @@ public class MessageGroupHashBucket implements MessageGroupMap { private final int bucketCount; private final ConsumerId[] consumers; + private LRUMap<String,String>cache=new LRUMap<String,String>(64); public MessageGroupHashBucket(int bucketCount) { this.bucketCount = bucketCount; this.consumers = new ConsumerId[bucketCount]; } - public void put(String groupId, ConsumerId consumerId) { + public synchronized void put(String groupId, ConsumerId consumerId) { int bucket = getBucketNumber(groupId); consumers[bucket] = consumerId; + if (consumerId != null){ + cache.put(groupId,consumerId.toString()); + } } - public ConsumerId get(String groupId) { + public synchronized ConsumerId get(String groupId) { int bucket = getBucketNumber(groupId); + //excersise cache + cache.get(groupId); return consumers[bucket]; } - public ConsumerId removeGroup(String groupId) { + public synchronized ConsumerId removeGroup(String groupId) { int bucket = getBucketNumber(groupId); ConsumerId answer = consumers[bucket]; consumers[bucket] = null; + cache.remove(groupId); return answer; } - public MessageGroupSet removeConsumer(ConsumerId consumerId) { + public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) { MessageGroupSet answer = null; for (int i = 0; i < consumers.length; i++) { ConsumerId owner = consumers[i]; @@ -66,6 +76,23 @@ public class MessageGroupHashBucket implements MessageGroupMap { return answer; } + public synchronized void removeAll(){ + for (int i =0; i < consumers.length; i++){ + consumers[i] = null; + } + } + + @Override + public Map<String, String> getGroups() { + return cache; + } + + @Override + public String getType() { + return "bucket"; + } + + public String toString() { int count = 0; for (int i = 0; i < consumers.length; i++) { http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java index 9d37542..c952c94 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.region.group; +import java.util.Map; + import org.apache.activemq.command.ConsumerId; /** @@ -33,4 +35,13 @@ public interface MessageGroupMap { MessageGroupSet removeConsumer(ConsumerId consumerId); + void removeAll(); + + /** + * @return a map of group names and associated consumer Id + */ + Map<String,String> getGroups(); + + String getType(); + } http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java index 737ce3a..e3fd4ed 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.group; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -57,6 +58,26 @@ public class SimpleMessageGroupMap implements MessageGroupMap { return ownedGroups; } + + @Override + public void removeAll(){ + map.clear(); + } + + @Override + public Map<String, String> getGroups() { + Map<String,String> result = new HashMap<String,String>(); + for (Map.Entry<String,ConsumerId>entry:map.entrySet()){ + result.put(entry.getKey(),entry.getValue().toString()); + } + return result; + } + + @Override + public String getType() { + return "simple"; + } + public String toString() { return "message groups: " + map.size(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java index 7b29c14..91f9713 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java @@ -36,4 +36,8 @@ public class SimpleMessageGroupSet implements MessageGroupSet { set.add(group); } + protected Set<String> getUnderlyingSet(){ + return set; + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index e6ae512..c219b19 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -24,12 +24,11 @@ import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.QueueBrowserSubscription; import org.apache.activemq.broker.region.QueueSubscription; -import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; +import org.apache.activemq.broker.region.group.GroupFactoryFinder; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.filter.DestinationMapEntry; import org.apache.activemq.network.NetworkBridgeFilterFactory; @@ -54,6 +53,7 @@ public class PolicyEntry extends DestinationMapEntry { private PendingMessageLimitStrategy pendingMessageLimitStrategy; private MessageEvictionStrategy messageEvictionStrategy; private long memoryLimit; + private String messageGroupMapFactoryType = "cached"; private MessageGroupMapFactory messageGroupMapFactory; private PendingQueueMessageStoragePolicy pendingQueuePolicy; private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; @@ -395,7 +395,11 @@ public class PolicyEntry extends DestinationMapEntry { public MessageGroupMapFactory getMessageGroupMapFactory() { if (messageGroupMapFactory == null) { - messageGroupMapFactory = new MessageGroupHashBucketFactory(); + try { + messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType()); + }catch(Exception e){ + LOG.error("Failed to create message group Factory ",e); + } } return messageGroupMapFactory; } @@ -410,6 +414,16 @@ public class PolicyEntry extends DestinationMapEntry { this.messageGroupMapFactory = messageGroupMapFactory; } + + public String getMessageGroupMapFactoryType() { + return messageGroupMapFactoryType; + } + + public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) { + this.messageGroupMapFactoryType = messageGroupMapFactoryType; + } + + /** * @return the pendingDurableSubscriberPolicy */ http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/bucket ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/bucket b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/bucket new file mode 100644 index 0000000..5d8d791 --- /dev/null +++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/bucket @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/cached ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/cached b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/cached new file mode 100644 index 0000000..9237273 --- /dev/null +++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/cached @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/simple ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/simple b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/simple new file mode 100644 index 0000000..30591d7 --- /dev/null +++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/simple @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.broker.region.group.SimpleMessageGroupMapFactory
