This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 07b02159d2 ARTEMIS-4570 filter not applied to all brokers in cluster 07b02159d2 is described below commit 07b02159d26f15a7f21c63726438cbcccdc746a8 Author: Howard Gao <howard....@gmail.com> AuthorDate: Mon Jan 15 18:39:14 2024 +0800 ARTEMIS-4570 filter not applied to all brokers in cluster --- .../api/core/management/CoreNotificationType.java | 3 +- .../core/postoffice/impl/PostOfficeImpl.java | 17 +++++++ .../core/server/cluster/RemoteQueueBinding.java | 3 ++ .../cluster/impl/ClusterConnectionBridge.java | 2 + .../server/cluster/impl/ClusterConnectionImpl.java | 21 +++++++++ .../cluster/impl/RemoteQueueBindingImpl.java | 9 +++- .../artemis/core/server/impl/QueueImpl.java | 4 +- .../distribution/MessageLoadBalancingTest.java | 55 ++++++++++++++++++++++ .../integration/server/ConfigurationTest.java | 1 + .../core/postoffice/impl/BindingsImplTest.java | 4 ++ 10 files changed, 115 insertions(+), 4 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java index 686400ef7a..a07f8f4dae 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java @@ -47,7 +47,8 @@ public enum CoreNotificationType implements NotificationType { SESSION_CREATED(26), SESSION_CLOSED(27), MESSAGE_DELIVERED(28), - MESSAGE_EXPIRED(29); + MESSAGE_EXPIRED(29), + BINDING_UPDATED(30); private final int value; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index f241a964af..9cf8b61908 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -801,6 +801,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if ((forceUpdate || newFilter != oldFilter) && !Objects.equals(oldFilter, newFilter)) { changed = true; queue.setFilter(newFilter); + notifyBindingUpdatedForQueue(queueBinding); } if ((forceUpdate || queueConfiguration.isConfigurationManaged() != null) && !Objects.equals(queueConfiguration.isConfigurationManaged(), queue.isConfigurationManaged())) { changed = true; @@ -836,6 +837,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + public void notifyBindingUpdatedForQueue(QueueBinding binding) throws Exception { + //only the filter could be updated + TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName()); + Filter filter = binding.getFilter(); + if (filter != null) { + props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString()); + } + props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance()); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress()); + + String uid = UUIDGenerator.getInstance().generateStringUUID(); + logger.debug("ClusterCommunication::Sending notification for updateBinding {} from server {}", binding, server); + managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_UPDATED, props)); + } + @Override public AddressInfo updateAddressInfo(SimpleString addressName, EnumSet<RoutingType> routingTypes) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java index b65d25f319..2c9a7e59de 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.server.cluster; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; @@ -34,5 +35,7 @@ public interface RemoteQueueBinding extends QueueBinding { long getRemoteQueueID(); + void setFilter(Filter filter); + MessageLoadBalancingType getMessageLoadBalancingType(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index f977c0852f..2bd045500d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -258,6 +258,8 @@ public class ClusterConnectionBridge extends BridgeImpl { "', '" + CoreNotificationType.BINDING_REMOVED + "', '" + + CoreNotificationType.BINDING_UPDATED + + "', '" + CoreNotificationType.CONSUMER_CREATED + "', '" + CoreNotificationType.CONSUMER_CLOSED + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 32d11aa394..98397440ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.TopologyManager; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; @@ -1134,6 +1135,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn break; } + case BINDING_UPDATED: { + doBindingUpdated(message); + break; + } case CONSUMER_CREATED: { doConsumerCreated(message); @@ -1269,6 +1274,22 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } } + private synchronized void doBindingUpdated(final ClientMessage message) throws Exception { + logger.trace("{} Update binding {}", ClusterConnectionImpl.this, message); + if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) { + throw new IllegalStateException("clusterName is null"); + } + + SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME); + SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING); + + RemoteQueueBinding existingBinding = (RemoteQueueBinding) postOffice.getBinding(clusterName); + + if (existingBinding != null) { + existingBinding.setFilter(FilterImpl.createFilter(filterString)); + } + } + private synchronized void doBindingAdded(final ClientMessage message) throws Exception { logger.trace("{} Adding binding {}", ClusterConnectionImpl.this, message); if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java index 5fcf947bbd..992b909ff8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java @@ -50,12 +50,12 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { private final long remoteQueueID; - private final Filter queueFilter; - private final Set<Filter> filters = new HashSet<>(); private final Map<SimpleString, Integer> filterCounts = new HashMap<>(); + private Filter queueFilter; + private int consumerCount; private final SimpleString idsHeaderName; @@ -351,6 +351,11 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { return remoteQueueID; } + @Override + public void setFilter(Filter filter) { + this.queueFilter = filter; + } + @Override public MessageLoadBalancingType getMessageLoadBalancingType() { return messageLoadBalancingType; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 50f216ca30..9d4a53fa4e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -4322,7 +4322,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { .setTemporary(temporary) .setInternal(internalQueue) .setTransient(refCountForConsumers instanceof TransientQueueManagerImpl) - .setAutoCreated(autoCreated); + .setAutoCreated(autoCreated) + .setEnabled(enabled) + .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch); } protected static class ConsumerHolder<T extends Consumer> implements PriorityAware { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java index 37c23305d9..ea763069b5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java @@ -16,9 +16,17 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -86,6 +94,53 @@ public class MessageLoadBalancingTest extends ClusterTestBase { Assert.assertNull(clientMessage); } + @Test + public void testMessageLoadBalancingWithFiltersUpdate() throws Exception { + setupCluster(MessageLoadBalancingType.ON_DEMAND); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + + waitForBindings(0, "queues.testaddress", 1, 0, true); + waitForBindings(0, "queues.testaddress", 1, 0, false); + waitForBindings(1, "queues.testaddress", 1, 0, true); + waitForBindings(1, "queues.testaddress", 1, 0, false); + + Binding[] bindings = new Binding[2]; + PostOffice[] po = new PostOffice[2]; + for (int i = 0; i < 2; i++) { + po[i] = servers[i].getPostOffice(); + bindings[i] = po[i].getBinding(new SimpleString("queue0")); + Assert.assertNotNull(bindings[i]); + + Queue queue0 = (Queue)bindings[i].getBindable(); + Assert.assertNotNull(queue0); + + QueueConfiguration qconfig = queue0.getQueueConfiguration(); + Assert.assertNotNull(qconfig); + + qconfig.setFilterString("color = 'red'"); + po[i].updateQueue(qconfig, true); + } + + SimpleString clusterName0 = bindings[1].getClusterName(); + RemoteQueueBinding remoteBinding = (RemoteQueueBinding) po[0].getBinding(clusterName0); + Assert.assertNotNull(remoteBinding); + + Wait.assertEquals("color = 'red'", () -> { + Filter filter = remoteBinding.getFilter(); + if (filter == null) { + return filter; + } + return filter.getFilterString().toString(); + }); + } + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java index 641c9a4008..364e5c47b2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java @@ -38,6 +38,7 @@ import org.junit.Assert; import org.junit.Test; public class ConfigurationTest extends ActiveMQTestBase { + @Test public void testStartWithDuplicateQueues() throws Exception { ActiveMQServer server = getActiveMQServer("duplicate-queues.xml"); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index f833340a0d..6502422af8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -514,6 +514,10 @@ public class BindingsImplTest extends ActiveMQTestBase { return 0; } + @Override + public void setFilter(Filter filter) { + } + @Override public MessageLoadBalancingType getMessageLoadBalancingType() { return messageLoadBalancingType;