Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 5c66343bc -> a12c21061


https://issues.apache.org/jira/browse/AMQ-6091

The JavaRuntimeConfigurationBroker can now apply a subset of policy
properties retrospectively to existing destinations versus applying
all properties of the policy update.

(cherry picked from commit a253ad3c71a07bb4d1883fe84be217af9855cfc6)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a12c2106
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a12c2106
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a12c2106

Branch: refs/heads/activemq-5.13.x
Commit: a12c21061ebc4cca9757d497f90ce00145c50968
Parents: 5c66343
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Wed Dec 16 20:50:48 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Fri Dec 18 18:22:19 2015 +0000

----------------------------------------------------------------------
 .../broker/region/policy/PolicyEntry.java       | 173 +++++--
 .../java/JavaRuntimeConfigurationBroker.java    |  22 +-
 .../activemq/plugin/util/PolicyEntryUtil.java   |  34 +-
 .../activemq/java/JavaPolicyEntryTest.java      | 468 ++++++++++++++++++-
 4 files changed, 639 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a12c2106/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 97d9155..01343a2 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import java.util.Set;
+
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.BaseDestination;
@@ -135,18 +137,48 @@ public class PolicyEntry extends DestinationMapEntry {
     }
 
     public void update(Queue queue) {
-        baseUpdate(queue);
-        if (memoryLimit > 0) {
+        update(queue, null);
+    }
+
+    /**
+     * Update a queue with this policy.  Only apply properties that
+     * match the includedProperties list.  Not all properties are eligible
+     * to be updated.
+     *
+     * If includedProperties is null then all of the properties will be set as
+     * isUpdate will return true
+     * @param baseDestination
+     * @param includedProperties
+     */
+    public void update(Queue queue, Set<String> includedProperties) {
+        baseUpdate(queue, includedProperties);
+        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
             queue.getMemoryUsage().setLimit(memoryLimit);
         }
-        queue.setUseConsumerPriority(isUseConsumerPriority());
-        queue.setStrictOrderDispatch(isStrictOrderDispatch());
-        queue.setOptimizedDispatch(isOptimizedDispatch());
-        queue.setLazyDispatch(isLazyDispatch());
-        queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
-        
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
-        
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
-        queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
+        if (isUpdate("useConsumerPriority", includedProperties)) {
+            queue.setUseConsumerPriority(isUseConsumerPriority());
+        }
+        if (isUpdate("strictOrderDispatch", includedProperties)) {
+            queue.setStrictOrderDispatch(isStrictOrderDispatch());
+        }
+        if (isUpdate("optimizedDispatch", includedProperties)) {
+            queue.setOptimizedDispatch(isOptimizedDispatch());
+        }
+        if (isUpdate("lazyDispatch", includedProperties)) {
+            queue.setLazyDispatch(isLazyDispatch());
+        }
+        if (isUpdate("timeBeforeDispatchStarts", includedProperties)) {
+            queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
+        }
+        if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) {
+            
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
+        }
+        if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) {
+            
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
+        }
+        if (isUpdate("persistJMSRedelivered", includedProperties)) {
+            queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
+        }
     }
 
     public void configure(Broker broker,Topic topic) {
@@ -167,42 +199,100 @@ public class PolicyEntry extends DestinationMapEntry {
     }
 
     public void update(Topic topic) {
-        baseUpdate(topic);
-        if (memoryLimit > 0) {
+        update(topic, null);
+    }
+
+    //If includedProperties is null then all of the properties will be set as
+    //isUpdate will return true
+    public void update(Topic topic, Set<String> includedProperties) {
+        baseUpdate(topic, includedProperties);
+        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
             topic.getMemoryUsage().setLimit(memoryLimit);
         }
-        topic.setLazyDispatch(isLazyDispatch());
+        if (isUpdate("lazyDispatch", includedProperties)) {
+            topic.setLazyDispatch(isLazyDispatch());
+        }
     }
 
     // attributes that can change on the fly
     public void baseUpdate(BaseDestination destination) {
-        destination.setProducerFlowControl(isProducerFlowControl());
-        destination.setAlwaysRetroactive(isAlwaysRetroactive());
-        
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
-
-        destination.setMaxPageSize(getMaxPageSize());
-        destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
-
-        destination.setMinimumMessageSize((int) getMinimumMessageSize());
-        destination.setMaxExpirePageSize(getMaxExpirePageSize());
-        
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
-        destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
-
-        destination.setGcIfInactive(isGcInactiveDestinations());
-        destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
-        destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC());
-        destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
-        destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
-        
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
-
-        destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
-        destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
-        
destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
-        destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
-        destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
-        destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
-        destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
-        
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
+        baseUpdate(destination, null);
+    }
+
+    // attributes that can change on the fly
+    //If includedProperties is null then all of the properties will be set as
+    //isUpdate will return true
+    public void baseUpdate(BaseDestination destination, Set<String> 
includedProperties) {
+        if (isUpdate("producerFlowControl", includedProperties)) {
+            destination.setProducerFlowControl(isProducerFlowControl());
+        }
+        if (isUpdate("alwaysRetroactive", includedProperties)) {
+            destination.setAlwaysRetroactive(isAlwaysRetroactive());
+        }
+        if (isUpdate("blockedProducerWarningInterval", includedProperties)) {
+            
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
+        }
+        if (isUpdate("maxPageSize", includedProperties)) {
+            destination.setMaxPageSize(getMaxPageSize());
+        }
+        if (isUpdate("maxBrowsePageSize", includedProperties)) {
+            destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
+        }
+
+        if (isUpdate("minimumMessageSize", includedProperties)) {
+            destination.setMinimumMessageSize((int) getMinimumMessageSize());
+        }
+        if (isUpdate("maxExpirePageSize", includedProperties)) {
+            destination.setMaxExpirePageSize(getMaxExpirePageSize());
+        }
+        if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) {
+            
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
+        }
+        if (isUpdate("storeUsageHighWaterMark", includedProperties)) {
+            
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
+        }
+        if (isUpdate("gcInactiveDestinations", includedProperties)) {
+            destination.setGcIfInactive(isGcInactiveDestinations());
+        }
+        if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
+            destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
+        }
+        if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) {
+            
destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC());
+        }
+        if (isUpdate("reduceMemoryFootprint", includedProperties)) {
+            destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
+        }
+        if (isUpdate("doOptimizeMessageStore", includedProperties)) {
+            
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
+        }
+        if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) 
{
+            
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
+        }
+        if (isUpdate("advisoryForConsumed", includedProperties)) {
+            destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
+        }
+        if (isUpdate("advisoryForDelivery", includedProperties)) {
+            destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
+        }
+        if (isUpdate("advisoryForDiscardingMessages", includedProperties)) {
+            
destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
+        }
+        if (isUpdate("advisoryForSlowConsumers", includedProperties)) {
+            
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
+        }
+        if (isUpdate("advisoryForFastProducers", includedProperties)) {
+            
destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
+        }
+        if (isUpdate("advisoryWhenFull", includedProperties)) {
+            destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
+        }
+        if (isUpdate("includeBodyForAdvisory", includedProperties)) {
+            destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
+        }
+        if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
+            
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
+        }
     }
 
     public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -321,6 +411,9 @@ public class PolicyEntry extends DestinationMapEntry {
         }
     }
 
+    private boolean isUpdate(String property, Set<String> includedProperties) {
+        return includedProperties == null || 
includedProperties.contains(property);
+    }
     // Properties
     // 
-------------------------------------------------------------------------
     public DispatchPolicy getDispatchPolicy() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a12c2106/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java
----------------------------------------------------------------------
diff --git 
a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java
 
b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java
index 8e4ed29..536909e 100644
--- 
a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java
+++ 
b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.plugin.java;
 
 import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -133,7 +135,7 @@ public class JavaRuntimeConfigurationBroker extends 
AbstractRuntimeConfiguration
     public void addNewPolicyEntry(PolicyEntry addition) {
         PolicyMap existingMap = getBrokerService().getDestinationPolicy();
         existingMap.put(addition.getDestination(), addition);
-        applyRetrospectively(addition);
+        PolicyEntryUtil.applyRetrospectively(this, addition, null);
         info("added policy for: " + addition.getDestination());
     }
 
@@ -156,6 +158,10 @@ public class JavaRuntimeConfigurationBroker extends 
AbstractRuntimeConfiguration
         modifyPolicyEntry(existing, false);
     }
 
+    public void modifyPolicyEntry(PolicyEntry existing, boolean 
createOrReplace) {
+        modifyPolicyEntry(existing, createOrReplace, null);
+    }
+
     /**
      * This method will modify an existing policy entry that matches the 
destination
      * set on the PolicyEntry passed in.  If createOrReplace is true, a new 
policy
@@ -165,10 +171,16 @@ public class JavaRuntimeConfigurationBroker extends 
AbstractRuntimeConfiguration
      * If createOrReplace is false, the policy update will only be applied if
      * the PolicyEntry reference already exists in the PolicyMap.
      *
+     * includedProperties is a list of properties that will be applied 
retrospectively. If
+     * the list is null, then all properties on the policy will be reapplied 
to the destination.
+     * This allows the ability to limit which properties are applied to 
existing destinations.
+     *
      * @param existing
      * @param createIfAbsent
+     * @param includedProperties - optional list of properties to apply 
retrospectively
      */
-    public void modifyPolicyEntry(PolicyEntry existing, boolean 
createOrReplace) {
+    public void modifyPolicyEntry(PolicyEntry existing, boolean 
createOrReplace,
+            Set<String> includedProperties) {
         PolicyMap existingMap = this.getBrokerService().getDestinationPolicy();
 
         //First just look up by the destination type to see if anything matches
@@ -194,7 +206,7 @@ public class JavaRuntimeConfigurationBroker extends 
AbstractRuntimeConfiguration
         //Make sure that at this point the passed in object and the entry in
         //the map are the same
         if (existingEntry != null && existingEntry.equals(existing)) {
-            applyRetrospectively(existingEntry);
+            PolicyEntryUtil.applyRetrospectively(this, existingEntry, 
includedProperties);
             this.info("updated policy for: " + existingEntry.getDestination());
         } else {
             throw new IllegalArgumentException("The policy can not be updated 
because it either does not exist or the PolicyEntry"
@@ -204,10 +216,6 @@ public class JavaRuntimeConfigurationBroker extends 
AbstractRuntimeConfiguration
         }
     }
 
-    protected void applyRetrospectively(PolicyEntry updatedEntry) {
-        PolicyEntryUtil.applyRetrospectively(this, updatedEntry);
-    }
-
     //authentication plugin
     public void updateSimpleAuthenticationPlugin(final 
SimpleAuthenticationPlugin updatedPlugin) {
         try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a12c2106/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java
 
b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java
index 7f38f5b..5ac135a 100644
--- 
a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java
+++ 
b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.plugin.util;
 
+import java.util.List;
 import java.util.Set;
 
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.Queue;
@@ -67,7 +69,27 @@ public class PolicyEntryUtil {
      * @param runtimeBroker
      * @param updatedEntry
      */
-    public static void applyRetrospectively(AbstractRuntimeConfigurationBroker 
runtimeBroker, PolicyEntry updatedEntry) {
+    public static void applyRetrospectively(AbstractRuntimeConfigurationBroker 
runtimeBroker,
+            PolicyEntry updatedEntry) {
+        PolicyEntryUtil.applyRetrospectively(runtimeBroker, updatedEntry, 
null);
+    }
+
+    /**
+     *
+     * Utility to properly apply an updated policy entry to all existing 
destinations that
+     * match this entry.  The destination will only be updated if the policy 
is the exact
+     * policy (most specific) that matches the destination.
+     *
+     * The includedProperties List is optional and is used to specify a list 
of properties
+     * to apply retrospectively to the matching destinations. This allows only 
certain properties
+     * to be reapplied.  If the list is null then all properties will be 
applied.
+     *
+     * @param runtimeBroker
+     * @param updatedEntry
+     * @param includedProperties
+     */
+    public static void applyRetrospectively(AbstractRuntimeConfigurationBroker 
runtimeBroker,
+            PolicyEntry updatedEntry, Set<String> includedProperties) {
         RegionBroker regionBroker = (RegionBroker) 
runtimeBroker.getBrokerService().getRegionBroker();
         for (Destination destination : 
regionBroker.getDestinations(updatedEntry.getDestination())) {
             //Look up the policy that applies to the destination
@@ -78,13 +100,15 @@ public class PolicyEntryUtil {
             //currently just an identity check which is what we want
             if (updatedEntry.equals(specificyPolicy)){
                 Destination target = destination;
-                if (destination instanceof DestinationFilter) {
-                    target = ((DestinationFilter)destination).getNext();
+                while (target instanceof DestinationFilter) {
+                    target = ((DestinationFilter)target).getNext();
                 }
+                //If we are providing a list of properties to set then use them
+                //to set eligible properties that are in the 
includedProperties list
                 if (target.getActiveMQDestination().isQueue()) {
-                    updatedEntry.update((Queue) target);
+                    updatedEntry.update((Queue) target, includedProperties);
                 } else if (target.getActiveMQDestination().isTopic()) {
-                    updatedEntry.update((Topic) target);
+                    updatedEntry.update((Topic) target, includedProperties);
                 }
                 runtimeBroker.debug("applied update to:" + target);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a12c2106/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
 
b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
index accae06..29b655e 100644
--- 
a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
+++ 
b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
@@ -17,9 +17,12 @@
 package org.apache.activemq.java;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Session;
@@ -29,6 +32,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RuntimeConfigTestSupport;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -53,6 +59,11 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
                 (JavaRuntimeConfigurationBroker) 
brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
     }
 
+    /**
+     * Test modifying a policy
+     *
+     * @throws Exception
+     */
     @Test
     public void testMod() throws Exception {
         BrokerService brokerService = new BrokerService();
@@ -63,10 +74,8 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
         policyMap.setPolicyEntries(Arrays.asList(entry));
         brokerService.setDestinationPolicy(policyMap);
 
-
         startBroker(brokerService);
         assertTrue("broker alive", brokerService.isStarted());
-
         verifyQueueLimit("Before", 1024);
 
         //Reapply new limit
@@ -80,6 +89,150 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
         verifyQueueLimit("Before", 4194304);
     }
 
+    /**
+     * Test modifying a policy but only applying a subset o
+     * properties retroactively to existing destinations
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testModFilterProperties() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setMemoryLimit(1024);
+        entry.setMaxPageSize(500);
+        entry.setMaxBrowsePageSize(100);
+        policyMap.setPolicyEntries(Arrays.asList(entry));
+        brokerService.setDestinationPolicy(policyMap);
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+        verifyQueueLimit("Before", 1024);
+        assertEquals(500, getQueue("Before").getMaxPageSize());
+        assertEquals(100, getQueue("Before").getMaxBrowsePageSize());
+
+        //Reapply new limit, add the property to the list of included 
properties
+        entry.setMemoryLimit(4194304);
+        entry.setMaxPageSize(300);
+        entry.setMaxBrowsePageSize(200);
+        Set<String> properties = new HashSet<>();
+        properties.add("memoryLimit");
+        properties.add("maxPageSize");
+        javaConfigBroker.modifyPolicyEntry(entry, false, properties);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        verifyQueueLimit("After", 4194304);
+        assertEquals(300, getQueue("After").getMaxPageSize());
+        assertEquals(200, getQueue("After").getMaxBrowsePageSize());
+
+       // change to existing dest, maxBrowsePageSize was not included
+        //in the property list so it should not have changed
+        verifyQueueLimit("Before", 4194304);
+        assertEquals(300, getQueue("Before").getMaxPageSize());
+        assertEquals(100, getQueue("Before").getMaxBrowsePageSize());
+    }
+
+    @Test
+    public void testModQueueAndTopic() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry qEntry = new PolicyEntry();
+        qEntry.setQueue(">");
+        qEntry.setPersistJMSRedelivered(true);
+        PolicyEntry tEntry = new PolicyEntry();
+        tEntry.setTopic(">");
+        tEntry.setLazyDispatch(true);
+        policyMap.setPolicyEntries(Arrays.asList(qEntry, tEntry));
+        brokerService.setDestinationPolicy(policyMap);
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+        assertEquals(true, getQueue("queueBefore").isPersistJMSRedelivered());
+        assertEquals(true, getTopic("topicBefore").isLazyDispatch());
+
+        //Reapply new limit, add the property to the list of included 
properties
+        qEntry.setPersistJMSRedelivered(false);
+        tEntry.setLazyDispatch(false);
+        Set<String> queueProperties = new HashSet<>();
+        queueProperties.add("persistJMSRedelivered");
+        Set<String> topicProperties = new HashSet<>();
+        topicProperties.add("lazyDispatch");
+        javaConfigBroker.modifyPolicyEntry(qEntry, false, queueProperties);
+        javaConfigBroker.modifyPolicyEntry(tEntry, false, topicProperties);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        assertEquals(false, getQueue("queueBefore").isPersistJMSRedelivered());
+        assertEquals(false, getTopic("topicBefore").isLazyDispatch());
+
+        assertEquals(false, getQueue("queueAfter").isPersistJMSRedelivered());
+        assertEquals(false, getTopic("topicAfter").isLazyDispatch());
+    }
+
+    /**
+     * Test that a property that is not part of the update methods (can't be 
changed after creation)
+     * will not be applied to existing destinations but will be applied to new 
destinations
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testModFilterExcludedProperty() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setEnableAudit(true);
+        policyMap.setPolicyEntries(Arrays.asList(entry));
+        brokerService.setDestinationPolicy(policyMap);
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+        assertTrue(getQueue("Before").isEnableAudit());
+
+        //Reapply new limit, add the property to the list of included 
properties
+        entry.setEnableAudit(false);
+        Set<String> properties = new HashSet<>();
+        properties.add("enableAudit");
+        javaConfigBroker.modifyPolicyEntry(entry, false, properties);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        //no change because enableAudit is excluded
+        assertTrue(getQueue("Before").isEnableAudit());
+
+        //A new destination should have the property changed
+        assertFalse(getQueue("After").isEnableAudit());
+    }
+
+    @Test
+    public void testModFilterPropertiesInvalid() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setMemoryLimit(1024);
+        policyMap.setPolicyEntries(Arrays.asList(entry));
+        brokerService.setDestinationPolicy(policyMap);
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+        verifyQueueLimit("Before", 1024);
+
+        //use a property that doesn't exist, so nothing should be updated
+        entry.setMemoryLimit(4194304);
+        Set<String> properties = new HashSet<>();
+        properties.add("invalid");
+        javaConfigBroker.modifyPolicyEntry(entry, false, properties);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        //This should be unchanged as the list of properties only
+        //has an invalid property so nothing will be re-applied retrospectively
+        verifyQueueLimit("Before", 1024);
+
+        //A new destination should be updated because the policy was changed
+        verifyQueueLimit("After", 4194304);
+    }
+
     @Test
     public void testModNewPolicyObject() throws Exception {
         BrokerService brokerService = new BrokerService();
@@ -114,6 +267,8 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
 
     /**
      * Test that a new policy is added and applied
+     * Test that a new policy will be added when setting createOrReplace to 
true
+     * when calling modifyPolicyEntry
      *
      * @throws Exception
      */
@@ -143,6 +298,9 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
 
     /**
      * Test that a new policy is not added
+     * Pass a new policy to modifyPolicyEntry which should throw an exception
+     * because the policy didn't already exist
+     *
      * @throws Exception
      */
     @Test
@@ -185,10 +343,8 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
         policyMap.setPolicyEntries(Arrays.asList(entry));
         brokerService.setDestinationPolicy(policyMap);
 
-
         startBroker(brokerService);
         assertTrue("broker alive", brokerService.isStarted());
-
         verifyQueueLimit("Before", 1024);
 
         //Reapply new limit with new object that matches
@@ -369,6 +525,133 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
         verifyTopicLimit("test2.test.after", 4000l);
     }
 
+    @Test
+    public void testAllQueuePropertiesApplied() throws Exception {
+        testAllQueuePropertiesAppliedFilter(null);
+    }
+
+    /**
+     * Make sure all properties set on the filter Set are applied
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testAllQueuePropertiesAppliedFilter() throws Exception {
+        testAllQueuePropertiesAppliedFilter(getQueuePropertySet());
+    }
+
+    /**
+     * Make sure all properties set on the filter Set are applied
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testAllTopicPropertiesAppliedFilter() throws Exception {
+        testAllTopicPropertiesAppliedFilter(getTopicPropertySet());
+    }
+
+    @Test
+    public void testAllTopicPropertiesApplied() throws Exception {
+        testAllTopicPropertiesAppliedFilter(null);
+    }
+
+    private void testAllQueuePropertiesAppliedFilter(Set<String> properties) 
throws Exception {
+        BrokerService brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+
+        //initial config
+        setAllDestPolicyProperties(entry, true, true, 10,
+                100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
+                30, true, true, true, true, true, true, true, true);
+        setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100,
+                100, true, true);
+
+        policyMap.setPolicyEntries(Arrays.asList(entry));
+        brokerService.setDestinationPolicy(policyMap);
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        //validate config
+        assertAllDestPolicyProperties(getQueue("Before"), true, true, 10,
+                100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
+                30, true, true, true, true, true, true, true, true);
+        assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, 
true, true, 100,
+                100, true, true);
+
+
+        //change config
+        setAllDestPolicyProperties(entry, false, false, 100,
+                1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, 
false,
+                300, false, false, false, false, false, false, false, false);
+        setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 
1000,
+                1000, false, false);
+
+        javaConfigBroker.modifyPolicyEntry(entry, false, properties);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        assertAllDestPolicyProperties(getQueue("Before"), false, false, 100,
+                1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, 
false,
+                300, false, false, false, false, false, false, false, false);
+        assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, 
false, false, false, 1000,
+                1000, false, false);
+
+        //check new dest
+        assertAllDestPolicyProperties(getQueue("After"), false, false, 100,
+                1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, 
false,
+                300, false, false, false, false, false, false, false, false);
+        assertAllQueuePolicyProperties(getQueue("After"), 100000, false, 
false, false, false, 1000,
+                1000, false, false);
+    }
+
+    private void testAllTopicPropertiesAppliedFilter(Set<String> properties) 
throws Exception {
+        BrokerService brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setTopic(">");
+
+        //initial config
+        setAllDestPolicyProperties(entry, true, true, 10,
+                100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
+                30, true, true, true, true, true, true, true, true);
+        setAllTopicPolicyProperties(entry, 10000, true);
+
+        policyMap.setPolicyEntries(Arrays.asList(entry));
+        brokerService.setDestinationPolicy(policyMap);
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        //validate config
+        assertAllDestPolicyProperties(getTopic("Before"), true, true, 10,
+                100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
+                30, true, true, true, true, true, true, true, true);
+        assertAllTopicPolicyProperties(getTopic("Before"), 10000, true);
+
+
+        //change config
+        setAllDestPolicyProperties(entry, false, false, 100,
+                1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, 
false,
+                300, false, false, false, false, false, false, false, false);
+        setAllTopicPolicyProperties(entry, 100000, false);
+
+        javaConfigBroker.modifyPolicyEntry(entry, false, properties);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        assertAllDestPolicyProperties(getTopic("Before"), false, false, 100,
+                1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, 
false,
+                300, false, false, false, false, false, false, false, false);
+        assertAllTopicPolicyProperties(getTopic("Before"), 100000, false);
+
+        //check new dest
+        assertAllDestPolicyProperties(getTopic("After"), false, false, 100,
+                1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, 
false,
+                300, false, false, false, false, false, false, false, false);
+        assertAllTopicPolicyProperties(getTopic("After"), 100000, false);
+    }
+
     private void verifyQueueLimit(String dest, int memoryLimit) throws 
Exception {
         ActiveMQConnection connection = (ActiveMQConnection) new 
ActiveMQConnectionFactory("vm://localhost").createConnection();
         try {
@@ -376,7 +659,7 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             session.createConsumer(session.createQueue(dest));
 
-            assertEquals(memoryLimit, 
brokerService.getRegionBroker().getDestinationMap().get(new 
ActiveMQQueue(dest)).getMemoryUsage().getLimit());
+            assertEquals(memoryLimit, 
getQueue(dest).getMemoryUsage().getLimit());
         } finally {
             connection.close();
         }
@@ -389,9 +672,182 @@ public class JavaPolicyEntryTest extends 
RuntimeConfigTestSupport {
             Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             session.createConsumer(session.createTopic(dest));
 
-            assertEquals(memoryLimit, 
brokerService.getRegionBroker().getDestinationMap().get(new 
ActiveMQTopic(dest)).getMemoryUsage().getLimit());
+            assertEquals(memoryLimit, 
getTopic(dest).getMemoryUsage().getLimit());
         } finally {
             connection.close();
         }
     }
+
+    private Queue getQueue(String queue) throws Exception {
+        return (Queue) brokerService.getRegionBroker().addDestination(
+                brokerService.getAdminConnectionContext(), new 
ActiveMQQueue(queue), false);
+    }
+
+    private Topic getTopic(String topic) throws Exception {
+        return (Topic) brokerService.getRegionBroker().addDestination(
+                brokerService.getAdminConnectionContext(), new 
ActiveMQTopic(topic), false);
+    }
+
+    private Set<String> getQueuePropertySet() {
+        Set<String> properties = new HashSet<>(getDestPropertySet());
+        properties.add("memoryLimit");
+        properties.add("useConsumerPriority");
+        properties.add("strictOrderDispatch");
+        properties.add("optimizedDispatch");
+        properties.add("lazyDispatch");
+        properties.add("timeBeforeDispatchStarts");
+        properties.add("consumersBeforeDispatchStarts");
+        properties.add("allConsumersExclusiveByDefault");
+        properties.add("persistJMSRedelivered");
+        return properties;
+    }
+
+    private Set<String> getTopicPropertySet() {
+        Set<String> properties = new HashSet<>(getDestPropertySet());
+        properties.add("memoryLimit");
+        properties.add("lazyDispatch");
+        return properties;
+    }
+
+    private Set<String> getDestPropertySet() {
+        Set<String> properties = new HashSet<>();
+        properties.add("producerFlowControl");
+        properties.add("alwaysRetroactive");
+        properties.add("blockedProducerWarningInterval");
+        properties.add("maxPageSize");
+        properties.add("maxBrowsePageSize");
+        properties.add("minimumMessageSize");
+        properties.add("maxExpirePageSize");
+        properties.add("cursorMemoryHighWaterMark");
+        properties.add("storeUsageHighWaterMark");
+        properties.add("gcInactiveDestinations");
+        properties.add("gcWithNetworkConsumers");
+        properties.add("inactiveTimeoutBeforeGC");
+        properties.add("reduceMemoryFootprint");
+        properties.add("doOptimizeMessageStore");
+        properties.add("optimizeMessageStoreInFlightLimit");
+        properties.add("advisoryForConsumed");
+        properties.add("advisoryForDelivery");
+        properties.add("advisoryForDiscardingMessages");
+        properties.add("advisoryForSlowConsumers");
+        properties.add("advisoryForFastProducers");
+        properties.add("advisoryWhenFull");
+        properties.add("includeBodyForAdvisory");
+        properties.add("sendAdvisoryIfNoConsumers");
+        return properties;
+
+    }
+
+    private void setAllQueuePolicyProperties(PolicyEntry entry, long 
memoryLimit, boolean useConsumerPriority,
+            boolean strictOrderDispatch, boolean optimizedDispatch, boolean 
lazyDispatch,
+            int timeBeforeDispatchStarts, int consumersBeforeDispatchStarts, 
boolean allConsumersExclusiveByDefault,
+            boolean persistJMSRedelivered) {
+
+        entry.setMemoryLimit(memoryLimit);
+        entry.setUseConsumerPriority(useConsumerPriority);
+        entry.setStrictOrderDispatch(strictOrderDispatch);
+        entry.setOptimizedDispatch(optimizedDispatch);
+        entry.setLazyDispatch(lazyDispatch);
+        entry.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
+        entry.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
+        
entry.setAllConsumersExclusiveByDefault(allConsumersExclusiveByDefault);
+        entry.setPersistJMSRedelivered(persistJMSRedelivered);
+    }
+
+    private void setAllTopicPolicyProperties(PolicyEntry entry, long 
memoryLimit, boolean lazyDispatch) {
+        entry.setMemoryLimit(memoryLimit);
+        entry.setLazyDispatch(lazyDispatch);
+    }
+
+    private void setAllDestPolicyProperties(PolicyEntry entry, boolean 
producerFlowControl,
+            boolean alwaysRetroactive, long blockedProducerWarningInterval, 
int maxPageSize,
+            int maxBrowsePageSize, long minimumMessageSize, int 
maxExpirePageSize, int cursorMemoryHighWaterMark,
+            int storeUsageHighWaterMark, boolean gcInactiveDestinations, 
boolean gcWithNetworkConsumers,
+            long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, 
boolean doOptimizeMessageStore,
+            int optimizeMessageStoreInFlightLimit, boolean 
advisoryForConsumed, boolean advisoryForDelivery,
+            boolean advisoryForDiscardingMessages, boolean 
advisoryForSlowConsumers, boolean advisoryForFastProducers,
+            boolean advisoryWhenFull, boolean includeBodyForAdvisory, boolean 
sendAdvisoryIfNoConsumers) {
+
+        entry.setProducerFlowControl(producerFlowControl);
+        entry.setAlwaysRetroactive(alwaysRetroactive);
+        
entry.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
+        entry.setMaxPageSize(maxPageSize);
+        entry.setMaxBrowsePageSize(maxBrowsePageSize);
+        entry.setMinimumMessageSize(minimumMessageSize);
+        entry.setMaxExpirePageSize(maxExpirePageSize);
+        entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
+        entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark);
+        entry.setGcInactiveDestinations(gcInactiveDestinations);
+        entry.setGcWithNetworkConsumers(gcWithNetworkConsumers);
+        entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC);
+        entry.setReduceMemoryFootprint(reduceMemoryFootprint);
+        entry.setDoOptimzeMessageStorage(doOptimizeMessageStore);
+        
entry.setOptimizeMessageStoreInFlightLimit(optimizeMessageStoreInFlightLimit);
+        entry.setAdvisoryForConsumed(advisoryForConsumed);
+        entry.setAdvisoryForDelivery(advisoryForDelivery);
+        entry.setAdvisoryForDiscardingMessages(advisoryForDiscardingMessages);
+        entry.setAdvisoryForSlowConsumers(advisoryForSlowConsumers);
+        entry.setAdvisoryForFastProducers(advisoryForFastProducers);
+        entry.setAdvisoryWhenFull(advisoryWhenFull);
+        entry.setIncludeBodyForAdvisory(includeBodyForAdvisory);
+        entry.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
+    }
+
+    private void assertAllQueuePolicyProperties(Queue queue, long memoryLimit, 
boolean useConsumerPriority,
+            boolean strictOrderDispatch, boolean optimizedDispatch, boolean 
lazyDispatch,
+            int timeBeforeDispatchStarts, int consumersBeforeDispatchStarts, 
boolean allConsumersExclusiveByDefault,
+            boolean persistJMSRedelivered) {
+
+        assertEquals(memoryLimit, queue.getMemoryUsage().getLimit());
+        assertEquals(useConsumerPriority, queue.isUseConsumerPriority());
+        assertEquals(strictOrderDispatch, queue.isStrictOrderDispatch());
+        assertEquals(optimizedDispatch, queue.isOptimizedDispatch());
+        assertEquals(lazyDispatch, queue.isLazyDispatch());
+        assertEquals(timeBeforeDispatchStarts, 
queue.getTimeBeforeDispatchStarts());
+        assertEquals(consumersBeforeDispatchStarts, 
queue.getConsumersBeforeDispatchStarts());
+        assertEquals(allConsumersExclusiveByDefault, 
queue.isAllConsumersExclusiveByDefault());
+        assertEquals(persistJMSRedelivered, queue.isPersistJMSRedelivered());
+
+    }
+
+    private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, 
boolean lazyDispatch) {
+        assertEquals(memoryLimit, topic.getMemoryUsage().getLimit());
+        assertEquals(lazyDispatch, topic.isLazyDispatch());
+    }
+
+    private void assertAllDestPolicyProperties(BaseDestination dest, boolean 
producerFlowControl,
+            boolean alwaysRetroactive, long blockedProducerWarningInterval, 
int maxPageSize,
+            int maxBrowsePageSize, long minimumMessageSize, int 
maxExpirePageSize, int cursorMemoryHighWaterMark,
+            int storeUsageHighWaterMark, boolean gcInactiveDestinations, 
boolean gcWithNetworkConsumers,
+            long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, 
boolean doOptimizeMessageStore,
+            int optimizeMessageStoreInFlightLimit, boolean 
advisoryForConsumed, boolean advisoryForDelivery,
+            boolean advisoryForDiscardingMessages, boolean 
advisoryForSlowConsumers, boolean advisoryForFastProducers,
+            boolean advisoryWhenFull, boolean includeBodyForAdvisory, boolean 
sendAdvisoryIfNoConsumers) {
+
+
+        assertEquals(producerFlowControl, dest.isProducerFlowControl());
+        assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive());
+        assertEquals(blockedProducerWarningInterval, 
dest.getBlockedProducerWarningInterval());
+        assertEquals(maxPageSize, dest.getMaxPageSize());
+        assertEquals(maxBrowsePageSize, dest.getMaxBrowsePageSize());
+        assertEquals(minimumMessageSize, dest.getMinimumMessageSize());
+        assertEquals(maxExpirePageSize, dest.getMaxExpirePageSize());
+        assertEquals(cursorMemoryHighWaterMark, 
dest.getCursorMemoryHighWaterMark());
+        assertEquals(storeUsageHighWaterMark, 
dest.getStoreUsageHighWaterMark());
+        assertEquals(gcInactiveDestinations, dest.isGcIfInactive());
+        assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers());
+        assertEquals(inactiveTimeoutBeforeGC, 
dest.getInactiveTimeoutBeforeGC());
+        assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint());
+        assertEquals(doOptimizeMessageStore, dest.isDoOptimzeMessageStorage());
+        assertEquals(optimizeMessageStoreInFlightLimit, 
dest.getOptimizeMessageStoreInFlightLimit());
+        assertEquals(advisoryForConsumed, dest.isAdvisoryForConsumed());
+        assertEquals(advisoryForDelivery, dest.isAdvisoryForDelivery());
+        assertEquals(advisoryForDiscardingMessages, 
dest.isAdvisoryForDiscardingMessages());
+        assertEquals(advisoryForSlowConsumers, 
dest.isAdvisoryForSlowConsumers());
+        assertEquals(advisoryForFastProducers, 
dest.isAdvisoryForFastProducers());
+        assertEquals(advisoryWhenFull, dest.isAdvisoryWhenFull());
+        assertEquals(includeBodyForAdvisory, dest.isIncludeBodyForAdvisory());
+        assertEquals(sendAdvisoryIfNoConsumers, 
dest.isSendAdvisoryIfNoConsumers());
+
+    }
 }

Reply via email to