This is an automated email from the ASF dual-hosted git repository. gtully pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new e0b1621 ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster e0b1621 is described below commit e0b16217a1d198a19bbc8ed2c6cfe9f09bb8b993 Author: gtully <gary.tu...@gmail.com> AuthorDate: Wed Dec 1 15:27:54 2021 +0000 ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster --- .../balancing/BrokerBalancerConfiguration.java | 15 +- ...ration.java => NamedPropertyConfiguration.java} | 6 +- .../deployers/impl/FileConfigurationParser.java | 25 +- .../core/server/balancing/BrokerBalancer.java | 29 +- .../server/balancing/BrokerBalancerManager.java | 29 +- .../balancing/policies/ConsistentHashPolicy.java | 6 +- .../balancing/policies/DefaultPolicyFactory.java | 49 ---- .../server/balancing/policies/PolicyFactory.java | 6 +- .../balancing/policies/PolicyFactoryResolver.java | 19 +- .../transformer/ConsistentHashModulo.java | 47 +++ .../KeyTransformer.java} | 14 +- .../TransformerFactory.java} | 8 +- .../transformer/TransformerFactoryResolver.java | 62 ++++ .../resources/schema/artemis-configuration.xsd | 27 ++ .../core/config/impl/FileConfigurationTest.java | 11 +- .../balancing/BrokerBalancerManagerTest.java | 27 +- .../core/server/balancing/BrokerBalancerTest.java | 43 ++- .../policies/PolicyFactoryResolverTest.java} | 29 +- .../transformer/ConsistentHashModuloTest.java | 55 ++++ .../TransformerFactoryResolverTest.java | 43 +++ .../artemis/tests/util/ActiveMQTestBase.java | 2 +- .../resources/ConfigurationTest-full-config.xml | 8 + .../ConfigurationTest-xinclude-config.xml | 8 + docs/user-manual/en/broker-balancers.md | 7 + .../balancing/AutoClientIDShardClusterTest.java | 325 +++++++++++++++++++++ .../integration/balancing/BalancingTestBase.java | 10 +- .../tests/integration/balancing/TargetKeyTest.java | 9 +- 27 files changed, 778 insertions(+), 141 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java index 1da1c04..e20d033 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java @@ -28,7 +28,8 @@ public class BrokerBalancerConfiguration implements Serializable { private String localTargetFilter = null; private int cacheTimeout = -1; private PoolConfiguration poolConfiguration = null; - private PolicyConfiguration policyConfiguration = null; + private NamedPropertyConfiguration policyConfiguration = null; + private NamedPropertyConfiguration transformerConfiguration = null; public String getName() { return name; @@ -75,11 +76,11 @@ public class BrokerBalancerConfiguration implements Serializable { return this; } - public PolicyConfiguration getPolicyConfiguration() { + public NamedPropertyConfiguration getPolicyConfiguration() { return policyConfiguration; } - public BrokerBalancerConfiguration setPolicyConfiguration(PolicyConfiguration policyConfiguration) { + public BrokerBalancerConfiguration setPolicyConfiguration(NamedPropertyConfiguration policyConfiguration) { this.policyConfiguration = policyConfiguration; return this; } @@ -92,4 +93,12 @@ public class BrokerBalancerConfiguration implements Serializable { this.poolConfiguration = poolConfiguration; return this; } + + public void setTransformerConfiguration(NamedPropertyConfiguration configuration) { + this.transformerConfiguration = configuration; + } + + public NamedPropertyConfiguration getTransformerConfiguration() { + return transformerConfiguration; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java similarity index 85% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java index f1f8630..eefa61d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/NamedPropertyConfiguration.java @@ -20,7 +20,7 @@ package org.apache.activemq.artemis.core.config.balancing; import java.io.Serializable; import java.util.Map; -public class PolicyConfiguration implements Serializable { +public class NamedPropertyConfiguration implements Serializable { private String name; private Map<String, String> properties; @@ -29,7 +29,7 @@ public class PolicyConfiguration implements Serializable { return name; } - public PolicyConfiguration setName(String name) { + public NamedPropertyConfiguration setName(String name) { this.name = name; return this; } @@ -38,7 +38,7 @@ public class PolicyConfiguration implements Serializable { return properties; } - public PolicyConfiguration setProperties(Map<String, String> properties) { + public NamedPropertyConfiguration setProperties(Map<String, String> properties) { this.properties = properties; return this; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 1e92a83..bd27103 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -47,7 +47,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; -import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration; +import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; @@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; @@ -2653,7 +2654,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { brokerBalancerConfiguration.setCacheTimeout(getInteger(e, "cache-timeout", brokerBalancerConfiguration.getCacheTimeout(), Validators.MINUS_ONE_OR_GE_ZERO)); - PolicyConfiguration policyConfiguration = null; + NamedPropertyConfiguration policyConfiguration = null; PoolConfiguration poolConfiguration = null; NodeList children = e.getChildNodes(); @@ -2661,20 +2662,34 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { Node child = children.item(j); if (child.getNodeName().equals("policy")) { - policyConfiguration = new PolicyConfiguration(); - parsePolicyConfiguration((Element)child, policyConfiguration); + policyConfiguration = new NamedPropertyConfiguration(); + parsePolicyConfiguration((Element) child, policyConfiguration); brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration); } else if (child.getNodeName().equals("pool")) { poolConfiguration = new PoolConfiguration(); parsePoolConfiguration((Element) child, config, poolConfiguration); brokerBalancerConfiguration.setPoolConfiguration(poolConfiguration); + } else if (child.getNodeName().equals("local-target-key-transformer")) { + policyConfiguration = new NamedPropertyConfiguration(); + parseTransformerConfiguration((Element) child, policyConfiguration); + brokerBalancerConfiguration.setTransformerConfiguration(policyConfiguration); } } config.getBalancerConfigurations().add(brokerBalancerConfiguration); } - private void parsePolicyConfiguration(final Element e, final PolicyConfiguration policyConfiguration) throws ClassNotFoundException { + private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException { + String name = e.getAttribute("name"); + + TransformerFactoryResolver.getInstance().resolve(name); + + policyConfiguration.setName(name); + + policyConfiguration.setProperties(getMapOfChildPropertyElements(e)); + } + + private void parsePolicyConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException { String name = e.getAttribute("name"); PolicyFactoryResolver.getInstance().resolve(name); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java index c256209..a39c738 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.server.balancing.targets.Target; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver; import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult; +import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.jboss.logging.Logger; @@ -40,7 +41,6 @@ public class BrokerBalancer implements ActiveMQComponent { public static final String CLIENT_ID_PREFIX = ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX + "balancer.client."; - private final String name; private final TargetKey targetKey; @@ -55,6 +55,8 @@ public class BrokerBalancer implements ActiveMQComponent { private final Policy policy; + private final KeyTransformer transformer; + private final Cache<String, TargetResult> cache; private volatile boolean started = false; @@ -93,11 +95,21 @@ public class BrokerBalancer implements ActiveMQComponent { } - public BrokerBalancer(final String name, final TargetKey targetKey, final String targetKeyFilter, final Target localTarget, final String localTargetFilter, final Pool pool, final Policy policy, final int cacheTimeout) { + public BrokerBalancer(final String name, + final TargetKey targetKey, + final String targetKeyFilter, + final Target localTarget, + final String localTargetFilter, + final Pool pool, + final Policy policy, + KeyTransformer transformer, + final int cacheTimeout) { this.name = name; this.targetKey = targetKey; + this.transformer = transformer; + this.targetKeyResolver = new TargetKeyResolver(targetKey, targetKeyFilter); this.localTarget = new TargetResult(localTarget); @@ -149,7 +161,7 @@ public class BrokerBalancer implements ActiveMQComponent { public TargetResult getTarget(String key) { - if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) { + if (this.localTargetFilter != null && this.localTargetFilter.matcher(transform(key)).matches()) { if (logger.isDebugEnabled()) { logger.debug("The " + targetKey + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern()); } @@ -201,4 +213,15 @@ public class BrokerBalancer implements ActiveMQComponent { return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT; } + + private String transform(String key) { + String result = key; + if (transformer != null) { + result = transformer.transform(key); + if (logger.isDebugEnabled()) { + logger.debug("Key: " + key + ", transformed to " + result); + } + } + return result; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java index 10afe15..f9b8002 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java @@ -21,7 +21,7 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.cluster.DiscoveryGroup; import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; -import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration; +import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration; import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -39,6 +39,9 @@ import org.apache.activemq.artemis.core.server.balancing.targets.ActiveMQTargetF import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget; import org.apache.activemq.artemis.core.server.balancing.targets.Target; import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory; +import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer; +import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactory; +import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.jboss.logging.Logger; @@ -95,13 +98,19 @@ public final class BrokerBalancerManager implements ActiveMQComponent { } Policy policy = null; - PolicyConfiguration policyConfiguration = config.getPolicyConfiguration(); + NamedPropertyConfiguration policyConfiguration = config.getPolicyConfiguration(); if (policyConfiguration != null) { policy = deployPolicy(policyConfiguration, pool); } + KeyTransformer transformer = null; + NamedPropertyConfiguration transformerConfiguration = config.getTransformerConfiguration(); + if (transformerConfiguration != null) { + transformer = deployTransformer(transformerConfiguration); + } + BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(), - localTarget, config.getLocalTargetFilter(), pool, policy, config.getCacheTimeout()); + localTarget, config.getLocalTargetFilter(), pool, policy, transformer, config.getCacheTimeout()); balancerControllers.put(balancer.getName(), balancer); @@ -160,10 +169,10 @@ public final class BrokerBalancerManager implements ActiveMQComponent { return pool; } - private Policy deployPolicy(PolicyConfiguration policyConfig, Pool pool) throws ClassNotFoundException { + private Policy deployPolicy(NamedPropertyConfiguration policyConfig, Pool pool) throws ClassNotFoundException { PolicyFactory policyFactory = PolicyFactoryResolver.getInstance().resolve(policyConfig.getName()); - Policy policy = policyFactory.createPolicy(policyConfig.getName()); + Policy policy = policyFactory.create(); policy.init(policyConfig.getProperties()); @@ -174,6 +183,16 @@ public final class BrokerBalancerManager implements ActiveMQComponent { return policy; } + private KeyTransformer deployTransformer(NamedPropertyConfiguration configuration) throws Exception { + TransformerFactory factory = TransformerFactoryResolver.getInstance().resolve(configuration.getName()); + + KeyTransformer transformer = factory.create(); + + transformer.init(configuration.getProperties()); + + return transformer; + } + public BrokerBalancer getBalancer(String name) { return balancerControllers.get(name); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java index 77d4076..4c0dd46 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/ConsistentHashPolicy.java @@ -31,10 +31,6 @@ public class ConsistentHashPolicy extends AbstractPolicy { super(NAME); } - protected ConsistentHashPolicy(String name) { - super(name); - } - @Override public Target selectTarget(List<Target> targets, String key) { if (targets.size() > 1) { @@ -60,7 +56,7 @@ public class ConsistentHashPolicy extends AbstractPolicy { return null; } - private int getHash(String str) { + public static int getHash(String str) { final int FNV_INIT = 0x811c9dc5; final int FNV_PRIME = 0x01000193; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java deleted file mode 100644 index aa39787..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/DefaultPolicyFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.server.balancing.policies; - -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; - -public class DefaultPolicyFactory extends PolicyFactory { - private static final Map<String, Supplier<AbstractPolicy>> supportedPolicies = new HashMap<>(); - - static { - supportedPolicies.put(ConsistentHashPolicy.NAME, () -> new ConsistentHashPolicy()); - supportedPolicies.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy()); - supportedPolicies.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy()); - supportedPolicies.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy()); - } - - @Override - public String[] getSupportedPolicies() { - return supportedPolicies.keySet().toArray(new String[supportedPolicies.size()]); - } - - @Override - public AbstractPolicy createPolicy(String policyName) { - Supplier<AbstractPolicy> policySupplier = supportedPolicies.get(policyName); - - if (policySupplier == null) { - throw new IllegalArgumentException("Policy not supported: " + policyName); - } - - return policySupplier.get(); - } -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java index 4c745ee..227fda9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java @@ -17,8 +17,6 @@ package org.apache.activemq.artemis.core.server.balancing.policies; -public abstract class PolicyFactory { - public abstract String[] getSupportedPolicies(); - - public abstract Policy createPolicy(String policyName); +public interface PolicyFactory { + Policy create(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java index dab4f93..45a61db 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolver.java @@ -36,7 +36,10 @@ public class PolicyFactoryResolver { private final Map<String, PolicyFactory> policyFactories = new HashMap<>(); private PolicyFactoryResolver() { - registerPolicyFactory(new DefaultPolicyFactory()); + policyFactories.put(ConsistentHashPolicy.NAME, () -> new ConsistentHashPolicy()); + policyFactories.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy()); + policyFactories.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy()); + policyFactories.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy()); loadPolicyFactories(); } @@ -56,19 +59,15 @@ public class PolicyFactoryResolver { PolicyFactory.class, BrokerBalancer.class.getClassLoader()); for (PolicyFactory policyFactory : serviceLoader) { - registerPolicyFactory(policyFactory); + policyFactories.put(keyFromClassName(policyFactory.getClass().getName()), policyFactory); } } - public void registerPolicyFactory(PolicyFactory policyFactory) { - for (String policyName : policyFactory.getSupportedPolicies()) { - policyFactories.put(policyName, policyFactory); - } + public void registerPolicyFactory(String name, PolicyFactory policyFactory) { + policyFactories.put(name, policyFactory); } - public void unregisterPolicyFactory(PolicyFactory policyFactory) { - for (String policyName : policyFactory.getSupportedPolicies()) { - policyFactories.remove(policyName, policyFactory); - } + String keyFromClassName(String name) { + return name.substring(0, name.indexOf("PolicyFactory")); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java new file mode 100644 index 0000000..a092e86 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModulo.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.balancing.transformer; + +import java.util.Map; + +import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver; + +public class ConsistentHashModulo implements KeyTransformer { + public static final String NAME = "CONSISTENT_HASH_MODULO"; + public static final String MODULO = "modulo"; + int modulo = 0; + + @Override + public String transform(String str) { + if (TargetKeyResolver.DEFAULT_KEY_VALUE.equals(str)) { + // we only want to transform resolved keys + return str; + } + if (modulo == 0) { + return str; + } + int hash = ConsistentHashPolicy.getHash(str); + return String.valueOf( hash % modulo ); + } + + @Override + public void init(Map<String, String> properties) { + modulo = Integer.parseInt(properties.get(MODULO)); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java similarity index 76% copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java index 4c745ee..dc0224e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/KeyTransformer.java @@ -15,10 +15,16 @@ * limitations under the License. */ -package org.apache.activemq.artemis.core.server.balancing.policies; +package org.apache.activemq.artemis.core.server.balancing.transformer; -public abstract class PolicyFactory { - public abstract String[] getSupportedPolicies(); +import java.util.Map; - public abstract Policy createPolicy(String policyName); +public interface KeyTransformer { + + default void init(Map<String, String> properties) { + } + + default String transform(String key) { + return key; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java similarity index 78% copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java index 4c745ee..af60fda 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactory.java @@ -15,10 +15,8 @@ * limitations under the License. */ -package org.apache.activemq.artemis.core.server.balancing.policies; +package org.apache.activemq.artemis.core.server.balancing.transformer; -public abstract class PolicyFactory { - public abstract String[] getSupportedPolicies(); - - public abstract Policy createPolicy(String policyName); +public interface TransformerFactory { + KeyTransformer create(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java new file mode 100644 index 0000000..46dc635 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolver.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.balancing.transformer; + +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + +import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer; + +public class TransformerFactoryResolver { + private static TransformerFactoryResolver instance; + + public static TransformerFactoryResolver getInstance() { + if (instance == null) { + instance = new TransformerFactoryResolver(); + } + return instance; + } + + private final Map<String, TransformerFactory> factories = new HashMap<>(); + + private TransformerFactoryResolver() { + factories.put(ConsistentHashModulo.NAME, () -> new ConsistentHashModulo()); + loadFactories(); // let service loader override + } + + public TransformerFactory resolve(String policyName) throws ClassNotFoundException { + TransformerFactory factory = factories.get(policyName); + if (factory == null) { + throw new ClassNotFoundException("No TransformerFactory found for " + policyName); + } + return factory; + } + + private void loadFactories() { + ServiceLoader<TransformerFactory> serviceLoader = ServiceLoader.load( + TransformerFactory.class, BrokerBalancer.class.getClassLoader()); + for (TransformerFactory factory : serviceLoader) { + factories.put(keyFromClassName(factory.getClass().getName()), factory); + } + } + + String keyFromClassName(String name) { + return name.substring(0, name.indexOf("TransformerFactory")); + } +} diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 27d3134..6d29d83 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2136,6 +2136,13 @@ </xsd:documentation> </xsd:annotation> </xsd:element> + <xsd:element name="local-target-key-transformer" type="brokerBalancerKeyTransformerType" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + the local target key transformer configuration + </xsd:documentation> + </xsd:annotation> + </xsd:element> </xsd:sequence> <xsd:attribute name="name" type="xsd:string" use="required"> <xsd:annotation> @@ -2176,6 +2183,26 @@ <xsd:attributeGroup ref="xml:specialAttrs"/> </xsd:complexType> + <xsd:complexType name="brokerBalancerKeyTransformerType"> + <xsd:sequence> + <xsd:element ref="property" maxOccurs="unbounded" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + properties to configure a key transformer + </xsd:documentation> + </xsd:annotation> + </xsd:element> + </xsd:sequence> + <xsd:attribute name="name" type="xsd:ID" use="required"> + <xsd:annotation> + <xsd:documentation> + the name of the policy + </xsd:documentation> + </xsd:annotation> + </xsd:attribute> + <xsd:attributeGroup ref="xml:specialAttrs"/> + </xsd:complexType> + <xsd:complexType name="brokerBalancerPoolType"> <xsd:sequence maxOccurs="unbounded"> <xsd:element name="username" type="xsd:string" maxOccurs="1" minOccurs="0"> diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index d33b3ee..344aae0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -76,6 +76,8 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo.MODULO; + public class FileConfigurationTest extends ConfigurationImplTest { @BeforeClass @@ -265,13 +267,20 @@ public class FileConfigurationTest extends ConfigurationImplTest { } } - Assert.assertEquals(4, conf.getBalancerConfigurations().size()); + Assert.assertEquals(5, conf.getBalancerConfigurations().size()); for (BrokerBalancerConfiguration bc : conf.getBalancerConfigurations()) { if (bc.getName().equals("simple-local")) { Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID); Assert.assertNotNull(bc.getLocalTargetFilter()); Assert.assertNotNull(bc.getTargetKeyFilter()); Assert.assertNull(bc.getPolicyConfiguration()); + } else if (bc.getName().equals("simple-local-with-transformer")) { + Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID); + Assert.assertNotNull(bc.getLocalTargetFilter()); + Assert.assertNotNull(bc.getTargetKeyFilter()); + Assert.assertNull(bc.getPolicyConfiguration()); + Assert.assertNotNull(bc.getTransformerConfiguration()); + Assert.assertNotNull(bc.getTransformerConfiguration().getProperties().get(MODULO)); } else if (bc.getName().equals("simple-balancer")) { Assert.assertEquals(bc.getTargetKey(), TargetKey.USER_NAME); Assert.assertNull(bc.getLocalTargetFilter()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java index b98c7ec..a5bb93d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java @@ -17,12 +17,16 @@ package org.apache.activemq.artemis.core.server.balancing; +import java.util.HashMap; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; -import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration; +import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration; import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.junit.After; import org.junit.Before; @@ -62,7 +66,7 @@ public class BrokerBalancerManagerTest { BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration(); brokerBalancerConfiguration.setName("partition-local-pool"); - PolicyConfiguration policyConfig = new PolicyConfiguration(); + NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration(); policyConfig.setName(ConsistentHashPolicy.NAME); brokerBalancerConfiguration.setPolicyConfiguration(policyConfig); @@ -84,4 +88,23 @@ public class BrokerBalancerManagerTest { underTest.deployBrokerBalancer(brokerBalancerConfiguration); } + + @Test() + public void deployLocalOnlyWithPolicy() throws Exception { + + ManagementService mockManagementService = Mockito.mock(ManagementService.class); + Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService); + + BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration(); + brokerBalancerConfiguration.setName("partition-local-consistent-hash").setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(String.valueOf(2)); + NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration(); + policyConfig.setName(ConsistentHashModulo.NAME); + HashMap<String, String> properties = new HashMap<>(); + properties.put(ConsistentHashModulo.MODULO, String.valueOf(2)); + policyConfig.setProperties(properties); + brokerBalancerConfiguration.setTransformerConfiguration(policyConfig); + + + underTest.deployBrokerBalancer(brokerBalancerConfiguration); + } } \ No newline at end of file diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java index 7f8ca0e..732ea53 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.balancing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import org.apache.activemq.artemis.api.core.SimpleString; @@ -29,7 +28,7 @@ import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget; import org.apache.activemq.artemis.core.server.balancing.targets.Target; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult; -import org.junit.After; +import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -44,38 +43,34 @@ public class BrokerBalancerTest { @Before public void setUp() { - ActiveMQServer mockServer = mock(ActiveMQServer.class); Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID")); - localTarget = new LocalTarget(null, mockServer); + } + @Test + public void getTarget() { Pool pool = null; Policy policy = null; underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}", - localTarget, "^FOO.*", pool, policy, 0); - try { - underTest.start(); - } catch (Exception e) { - fail(e.getMessage()); - } - } - - @After - public void after() { - if (underTest != null) { - try { - underTest.stop(); - } catch (Exception e) { - fail(e.getMessage()); - } - } + localTarget, "^FOO.*", pool, policy, null, 0); + assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget()); + assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE")); } @Test - public void getTarget() { - assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget()); - assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE")); + public void getLocalTargetWithTransformer() throws Exception { + Pool pool = null; + Policy policy = null; + KeyTransformer keyTransformer = new KeyTransformer() { + @Override + public String transform(String key) { + return key.substring("TRANSFORM_TO".length() + 1); + } + }; + underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}", + localTarget, "^FOO.*", pool, policy, keyTransformer, 0); + assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget()); } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java similarity index 52% copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java copy to artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java index 4c745ee..0804ba4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactory.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/policies/PolicyFactoryResolverTest.java @@ -14,11 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.artemis.core.server.balancing.policies; -public abstract class PolicyFactory { - public abstract String[] getSupportedPolicies(); +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class PolicyFactoryResolverTest { + + @Test + public void resolveOk() throws Exception { + PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance(); + assertNotNull(instance.resolve(ConsistentHashPolicy.NAME)); + } + + @Test(expected = ClassNotFoundException.class) + public void resolveError() throws Exception { + PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance(); + assertNotNull(instance.resolve("NOT PRESENT")); + } - public abstract Policy createPolicy(String policyName); -} + @Test + public void keyFromName() throws Exception { + PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance(); + assertEquals("New", instance.keyFromClassName("NewPolicyFactory")); + } +} \ No newline at end of file diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java new file mode 100644 index 0000000..137181b --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/ConsistentHashModuloTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.balancing.transformer; + +import java.util.HashMap; + +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class ConsistentHashModuloTest { + + @Test + public void transform() { + ConsistentHashModulo underTest = new ConsistentHashModulo(); + + assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, underTest.transform(TargetKeyResolver.DEFAULT_KEY_VALUE)); + + assertEquals("AA", underTest.transform("AA")); // default modulo 0 does nothing + + HashMap<String, String> properties = new HashMap<>(); + + final int modulo = 2; + properties.put(ConsistentHashModulo.MODULO, String.valueOf(modulo)); + underTest.init(properties); + + String hash1 = underTest.transform("AAA"); + int v1 = Integer.parseInt(hash1); + + String hash2 = underTest.transform("BBB"); + int v2 = Integer.parseInt(hash2); + + assertNotEquals(hash1, hash2); + assertNotEquals(v1, v2); + assertTrue(v1 < modulo && v2 < modulo); + } +} \ No newline at end of file diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java new file mode 100644 index 0000000..d102da1 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/transformer/TransformerFactoryResolverTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.balancing.transformer; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TransformerFactoryResolverTest { + + @Test + public void resolveOk() throws Exception { + TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance(); + assertNotNull(instance.resolve(ConsistentHashModulo.NAME)); + } + + @Test(expected = ClassNotFoundException.class) + public void resolveError() throws Exception { + TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance(); + assertNotNull(instance.resolve("NOT PRESENT")); + } + + @Test + public void keyFromName() throws Exception { + TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance(); + assertEquals("New", instance.keyFromClassName("NewTransformerFactory")); + } +} \ No newline at end of file diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 7887458..7d6b04e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -1964,7 +1964,7 @@ public abstract class ActiveMQTestBase extends Assert { } } - if (bindingCount == expectedBindingCount && totConsumers == expectedConsumerCount) { + if (bindingCount == expectedBindingCount && (expectedConsumerCount == -1 || totConsumers == expectedConsumerCount)) { return true; } diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 0a772b1..5c58d82 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -158,6 +158,14 @@ <target-key-filter>^[^.]+</target-key-filter> <local-target-filter>DEFAULT</local-target-filter> </broker-balancer> + <broker-balancer name="simple-local-with-transformer"> + <target-key>CLIENT_ID</target-key> + <target-key-filter>^[^.]+</target-key-filter> + <local-target-filter>DEFAULT</local-target-filter> + <local-target-key-transformer name="CONSISTENT_HASH_MODULO"> + <property key="modulo" value="2"></property> + </local-target-key-transformer> + </broker-balancer> <broker-balancer name="simple-balancer"> <target-key>USER_NAME</target-key> <policy name="FIRST_ELEMENT"/> diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index faaef30..e5edf10 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -149,6 +149,14 @@ <target-key-filter>^[^.]+</target-key-filter> <local-target-filter>DEFAULT</local-target-filter> </broker-balancer> + <broker-balancer name="simple-local-with-transformer"> + <target-key>CLIENT_ID</target-key> + <target-key-filter>^[^.]+</target-key-filter> + <local-target-filter>DEFAULT</local-target-filter> + <local-target-key-transformer name="CONSISTENT_HASH_MODULO"> + <property key="modulo" value="2"></property> + </local-target-key-transformer> + </broker-balancer> <broker-balancer name="simple-balancer"> <target-key>USER_NAME</target-key> <policy name="FIRST_ELEMENT"/> diff --git a/docs/user-manual/en/broker-balancers.md b/docs/user-manual/en/broker-balancers.md index be42656..c821877 100644 --- a/docs/user-manual/en/broker-balancers.md +++ b/docs/user-manual/en/broker-balancers.md @@ -106,12 +106,19 @@ So a broker balancer with the cache enabled doesn't strictly follow the configur By default, the cache is enabled and will never timeout. See below for more details about setting the `cache-timeout` parameter. +## Key transformers +A `local-target-key-transformer` allows target key transformation before matching against any local-target-filter. One use case is +CLIENT_ID sharding across a cluster of N brokers. With a consistent hash % N transformation, each client id +can map exclusively to just one of the brokers. The included transformers are: +* `CONSISTENT_HASH_MODULO` that takes a single `modulo` property to configure the bound. + ## Defining broker balancers A broker balancer is defined by the `broker-balancer` element, it includes the following items: * the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor; * the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details; * the `target-key-filter` element defines a regular expression to filter the resolved keys; * the `local-target-filter` element defines a regular expression to match the keys that have to return a local target; +* the `local-target-key-transformer` element defines a key transformer, see [key transformers](#key-transformers); * the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration; * the `pool` element defines the pool to group the target brokers, see [pools](#pools). * the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java new file mode 100644 index 0000000..18b605f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.balancing; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; +import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver; +import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class AutoClientIDShardClusterTest extends BalancingTestBase { + + @Parameterized.Parameters(name = "protocol: {0}") + public static Collection<Object[]> data() { + final String[] protocols = new String[] {AMQP_PROTOCOL, CORE_PROTOCOL, OPENWIRE_PROTOCOL}; + Collection<Object[]> data = new ArrayList<>(); + for (String protocol : protocols) { + data.add(new Object[] {protocol}); + } + return data; + } + + private final String protocol; + final int numMessages = 50; + AtomicInteger toSend = new AtomicInteger(numMessages); + + public AutoClientIDShardClusterTest(String protocol) { + this.protocol = protocol; + } + + protected void setupServers() throws Exception { + for (int i = 0; i < 2; i++) { + setupLiveServer(i, true, HAType.SharedNothingReplication, true, false); + servers[i].addProtocolManagerFactory(new ProtonProtocolManagerFactory()); + servers[i].addProtocolManagerFactory(new OpenWireProtocolManagerFactory()); + } + setupClusterConnection("cluster0", name.getMethodName(), MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1); + setupClusterConnection("cluster1", name.getMethodName(), MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0); + toSend.set(numMessages); + } + + Runnable producer = new Runnable() { + final AtomicInteger producerSeq = new AtomicInteger(); + + @Override + public void run() { + while (toSend.get() > 0) { + try { + ConnectionFactory connectionFactory = createFactory(protocol, "producer", "admin", "admin"); + try (Connection connection = connectionFactory.createConnection()) { + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + javax.jms.Topic topic = session.createTopic(name.getMethodName()); + try (MessageProducer producer = session.createProducer(topic)) { + for (int i = 0; i < 10 && toSend.get() > 0; i++) { + Message message = session.createTextMessage(); + message.setIntProperty("SEQ", producerSeq.get() + 1); + producer.send(message); + producerSeq.incrementAndGet(); + toSend.decrementAndGet(); + } + TimeUnit.MILLISECONDS.sleep(100); + } + } + } + } catch (Exception ok) { + } + } + } + }; + + class DurableSub implements Runnable { + + final String id; + int receivedInOrder = -1; + int lastReceived; + int maxReceived; + AtomicBoolean consumerDone = new AtomicBoolean(); + AtomicBoolean orderShot = new AtomicBoolean(); + CountDownLatch registered = new CountDownLatch(1); + + DurableSub(String id) { + this.id = id; + } + + @Override + public void run() { + while (!consumerDone.get()) { + try { + ConnectionFactory connectionFactory = createFactory(protocol, "ClientId-" + id, "admin", "admin"); + Connection connection = null; + try { + connection = connectionFactory.createConnection(); + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + javax.jms.Topic topic = session.createTopic(name.getMethodName()); + try (TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "Sub-" + id)) { + registered.countDown(); + for (int i = 0; i < 5; i++) { + Message message = durableSubscriber.receive(500); + if (message != null) { + lastReceived = message.getIntProperty("SEQ"); + if (lastReceived > maxReceived) { + maxReceived = lastReceived; + } + if (receivedInOrder < 0) { + receivedInOrder = lastReceived; + } else if (receivedInOrder == lastReceived - 1) { + receivedInOrder++; + } else { + if (!orderShot.get()) { + System.err.println("Sub: " + id + ", received: out of order " + lastReceived + ", last in order: " + receivedInOrder); + } + orderShot.set(true); + } + } else { + // no point trying again if there is nothing for us now. + break; + } + } + TimeUnit.MILLISECONDS.sleep(500); + } + } + } finally { + if (connection != null) { + connection.close(); // seems openwire not jms2.0 auto closable always + } + } + } catch (Exception ok) { + } + } + } + } + + @Ignore("not totally reliable, but does show the root cause of the problem being solved") + public void testWithoutOutSharding() throws Exception { + setupServers(); + startServers(0, 1); + + // two bouncy durable consumers + DurableSub sub0 = new DurableSub("0"); + DurableSub sub1 = new DurableSub("1"); + + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + executorService.submit(sub0); + executorService.submit(sub1); + + // waiting for registration before production to give bridges a chance + assertTrue(sub0.registered.await(20, TimeUnit.SECONDS)); + assertTrue(sub1.registered.await(20, TimeUnit.SECONDS)); + + assertTrue(waitForBindings(servers[0], name.getMethodName(), true, 2, -1, 10000)); + assertTrue(waitForBindings(servers[1], name.getMethodName(), true, 2, -1, 10000)); + + // wait for remote bindings! + assertTrue(waitForBindings(servers[0], name.getMethodName(), false, 2, -1, 10000)); + assertTrue(waitForBindings(servers[1], name.getMethodName(), false, 2, -1, 10000)); + + // produce a few every second with failover randomize=true so we produce on all nodes + executorService.submit(producer); + + assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0)); + + assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived == numMessages)); + + assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived == numMessages)); + + // with bouncing, one 'may' be out of order, hence ignored + assertTrue(sub0.orderShot.get() || sub1.orderShot.get()); + + } finally { + sub0.consumerDone.set(true); + sub1.consumerDone.set(true); + executorService.shutdown(); + stopServers(0, 1); + } + } + + @Test + public void testWithConsistentHashClientIDModTwo() throws Exception { + setupServers(); + + addBalancerWithClientIdConsistentHashMod(); + + startServers(0, 1); + + // two bouncy durable consumers + DurableSub sub0 = new DurableSub("0"); + DurableSub sub1 = new DurableSub("1"); + + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + executorService.submit(sub0); + executorService.submit(sub1); + + // waiting for registration before production to give bridges a chance + assertTrue(sub0.registered.await(5, TimeUnit.SECONDS)); + assertTrue(sub1.registered.await(5, TimeUnit.SECONDS)); + + assertTrue(waitForBindings(servers[0], name.getMethodName(), true, 1, 1, 2000)); + assertTrue(waitForBindings(servers[1], name.getMethodName(), true, 1, 1, 2000)); + + // wait for remote bindings! + assertTrue(waitForBindings(servers[0], name.getMethodName(), false, 1, 1, 10000)); + assertTrue(waitForBindings(servers[1], name.getMethodName(), false, 1, 1, 10000)); + + // produce a few every second with failover randomize=true so we produce on all nodes + executorService.submit(producer); + + assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0)); + + assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived == numMessages)); + + assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived == numMessages)); + + // with partition, none will be out of order + assertFalse(sub0.orderShot.get() && sub1.orderShot.get()); + + } finally { + sub0.consumerDone.set(true); + sub1.consumerDone.set(true); + executorService.shutdown(); + stopServers(0, 1); + } + } + + private void addBalancerWithClientIdConsistentHashMod() { + final int numberOfNodes = 2; + for (int node = 0; node < numberOfNodes; node++) { + Configuration configuration = servers[node].getConfiguration(); + BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration().setName(BROKER_BALANCER_NAME); + brokerBalancerConfiguration.setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(TargetKeyResolver.DEFAULT_KEY_VALUE + "|" + node); + NamedPropertyConfiguration transformerConfig = new NamedPropertyConfiguration(); + transformerConfig.setName(ConsistentHashModulo.NAME); + HashMap<String, String> properties = new HashMap<>(); + properties.put(ConsistentHashModulo.MODULO, String.valueOf(numberOfNodes)); + transformerConfig.setProperties(properties); + brokerBalancerConfiguration.setTransformerConfiguration(transformerConfig); + + configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration)); + + TransportConfiguration acceptor = getDefaultServerAcceptor(node); + acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME); + } + } + + protected ConnectionFactory createFactory(String protocol, String clientID, String user, String password) throws Exception { + StringBuilder urlBuilder = new StringBuilder(); + + switch (protocol) { + + case CORE_PROTOCOL: { + urlBuilder.append("(tcp://localhost:61616,tcp://localhost:61617)?connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy"); + urlBuilder.append("&clientID="); + urlBuilder.append(clientID); + + return new ActiveMQConnectionFactory(urlBuilder.toString(), user, password); + } + case AMQP_PROTOCOL: { + + urlBuilder.append("failover:(amqp://localhost:61616,amqp://localhost:61617)?failover.randomize=true"); + urlBuilder.append("&jms.clientID="); + urlBuilder.append(clientID); + + return new JmsConnectionFactory(user, password, urlBuilder.toString()); + } + case OPENWIRE_PROTOCOL: { + + urlBuilder.append("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true&maxReconnectAttempts=0&startupMaxReconnectAttempts=0"); + urlBuilder.append("&jms.clientID="); + urlBuilder.append(clientID); + + return new org.apache.activemq.ActiveMQConnectionFactory(user, password, urlBuilder.toString()); + } + default: + throw new IllegalStateException("Unexpected value: " + protocol); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java index 890a813..c626325 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java @@ -27,7 +27,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; -import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration; +import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration; import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; @@ -88,7 +88,7 @@ public class BalancingTestBase extends ClusterTestBase { brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter) .setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize) .setLocalTargetEnabled(localTargetEnabled).setClusterConnection(clusterConnection)) - .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties)); + .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties)); configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration)); @@ -105,7 +105,7 @@ public class BalancingTestBase extends ClusterTestBase { brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter) .setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize) .setLocalTargetEnabled(localTargetEnabled).setDiscoveryGroupName("dg1")) - .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties)); + .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties)); configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration)); @@ -129,7 +129,7 @@ public class BalancingTestBase extends ClusterTestBase { brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter) .setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize) .setLocalTargetEnabled(localTargetEnabled).setStaticConnectors(staticConnectors)) - .setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties)); + .setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties)); configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration)); @@ -215,7 +215,7 @@ public class BalancingTestBase extends ClusterTestBase { urlBuilder.append(")"); } - urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries); + urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries + "&failover.randomize=true"); if (clientID != null) { urlBuilder.append("&jms.clientID="); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java index 5ccd8c9..446156d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java @@ -88,15 +88,10 @@ public class TargetKeyTest extends BalancingTestBase { @Before public void setup() throws Exception { - PolicyFactoryResolver.getInstance().registerPolicyFactory( + PolicyFactoryResolver.getInstance().registerPolicyFactory(MOCK_POLICY_NAME, new PolicyFactory() { @Override - public String[] getSupportedPolicies() { - return new String[] {MOCK_POLICY_NAME}; - } - - @Override - public Policy createPolicy(String policyName) { + public Policy create() { return new FirstElementPolicy(MOCK_POLICY_NAME) { @Override public Target selectTarget(List<Target> targets, String key) {