This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 07b02159d2 ARTEMIS-4570 filter not applied to all brokers in cluster
07b02159d2 is described below
commit 07b02159d26f15a7f21c63726438cbcccdc746a8
Author: Howard Gao <[email protected]>
AuthorDate: Mon Jan 15 18:39:14 2024 +0800
ARTEMIS-4570 filter not applied to all brokers in cluster
---
.../api/core/management/CoreNotificationType.java | 3 +-
.../core/postoffice/impl/PostOfficeImpl.java | 17 +++++++
.../core/server/cluster/RemoteQueueBinding.java | 3 ++
.../cluster/impl/ClusterConnectionBridge.java | 2 +
.../server/cluster/impl/ClusterConnectionImpl.java | 21 +++++++++
.../cluster/impl/RemoteQueueBindingImpl.java | 9 +++-
.../artemis/core/server/impl/QueueImpl.java | 4 +-
.../distribution/MessageLoadBalancingTest.java | 55 ++++++++++++++++++++++
.../integration/server/ConfigurationTest.java | 1 +
.../core/postoffice/impl/BindingsImplTest.java | 4 ++
10 files changed, 115 insertions(+), 4 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
index 686400ef7a..a07f8f4dae 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
@@ -47,7 +47,8 @@ public enum CoreNotificationType implements NotificationType {
SESSION_CREATED(26),
SESSION_CLOSED(27),
MESSAGE_DELIVERED(28),
- MESSAGE_EXPIRED(29);
+ MESSAGE_EXPIRED(29),
+ BINDING_UPDATED(30);
private final int value;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index f241a964af..9cf8b61908 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -801,6 +801,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
if ((forceUpdate || newFilter != oldFilter) &&
!Objects.equals(oldFilter, newFilter)) {
changed = true;
queue.setFilter(newFilter);
+ notifyBindingUpdatedForQueue(queueBinding);
}
if ((forceUpdate || queueConfiguration.isConfigurationManaged() !=
null) && !Objects.equals(queueConfiguration.isConfigurationManaged(),
queue.isConfigurationManaged())) {
changed = true;
@@ -836,6 +837,22 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
}
+ public void notifyBindingUpdatedForQueue(QueueBinding binding) throws
Exception {
+ //only the filter could be updated
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME,
binding.getClusterName());
+ Filter filter = binding.getFilter();
+ if (filter != null) {
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
filter.getFilterString());
+ }
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE,
binding.getDistance());
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS,
binding.getAddress());
+
+ String uid = UUIDGenerator.getInstance().generateStringUUID();
+ logger.debug("ClusterCommunication::Sending notification for
updateBinding {} from server {}", binding, server);
+ managementService.sendNotification(new Notification(uid,
CoreNotificationType.BINDING_UPDATED, props));
+ }
+
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
EnumSet<RoutingType> routingTypes)
throws Exception {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java
index b65d25f319..2c9a7e59de 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@@ -34,5 +35,7 @@ public interface RemoteQueueBinding extends QueueBinding {
long getRemoteQueueID();
+ void setFilter(Filter filter);
+
MessageLoadBalancingType getMessageLoadBalancingType();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index f977c0852f..2bd045500d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -258,6 +258,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
"', '" +
CoreNotificationType.BINDING_REMOVED +
"', '" +
+
CoreNotificationType.BINDING_UPDATED +
+ "', '" +
CoreNotificationType.CONSUMER_CREATED +
"', '" +
CoreNotificationType.CONSUMER_CLOSED +
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 32d11aa394..98397440ff 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyManager;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -1134,6 +1135,10 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
break;
}
+ case BINDING_UPDATED: {
+ doBindingUpdated(message);
+ break;
+ }
case CONSUMER_CREATED: {
doConsumerCreated(message);
@@ -1269,6 +1274,22 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
}
}
+ private synchronized void doBindingUpdated(final ClientMessage message)
throws Exception {
+ logger.trace("{} Update binding {}", ClusterConnectionImpl.this,
message);
+ if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
+ throw new IllegalStateException("clusterName is null");
+ }
+
+ SimpleString clusterName =
message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ SimpleString filterString =
message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ RemoteQueueBinding existingBinding = (RemoteQueueBinding)
postOffice.getBinding(clusterName);
+
+ if (existingBinding != null) {
+ existingBinding.setFilter(FilterImpl.createFilter(filterString));
+ }
+ }
+
private synchronized void doBindingAdded(final ClientMessage message)
throws Exception {
logger.trace("{} Adding binding {}", ClusterConnectionImpl.this,
message);
if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 5fcf947bbd..992b909ff8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -50,12 +50,12 @@ public class RemoteQueueBindingImpl implements
RemoteQueueBinding {
private final long remoteQueueID;
- private final Filter queueFilter;
-
private final Set<Filter> filters = new HashSet<>();
private final Map<SimpleString, Integer> filterCounts = new HashMap<>();
+ private Filter queueFilter;
+
private int consumerCount;
private final SimpleString idsHeaderName;
@@ -351,6 +351,11 @@ public class RemoteQueueBindingImpl implements
RemoteQueueBinding {
return remoteQueueID;
}
+ @Override
+ public void setFilter(Filter filter) {
+ this.queueFilter = filter;
+ }
+
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return messageLoadBalancingType;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 50f216ca30..9d4a53fa4e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -4322,7 +4322,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
.setTemporary(temporary)
.setInternal(internalQueue)
.setTransient(refCountForConsumers instanceof
TransientQueueManagerImpl)
- .setAutoCreated(autoCreated);
+ .setAutoCreated(autoCreated)
+ .setEnabled(enabled)
+ .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch);
}
protected static class ConsumerHolder<T extends Consumer> implements
PriorityAware {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
index 37c23305d9..ea763069b5 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
@@ -16,9 +16,17 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -86,6 +94,53 @@ public class MessageLoadBalancingTest extends
ClusterTestBase {
Assert.assertNull(clientMessage);
}
+ @Test
+ public void testMessageLoadBalancingWithFiltersUpdate() throws Exception {
+ setupCluster(MessageLoadBalancingType.ON_DEMAND);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+ Binding[] bindings = new Binding[2];
+ PostOffice[] po = new PostOffice[2];
+ for (int i = 0; i < 2; i++) {
+ po[i] = servers[i].getPostOffice();
+ bindings[i] = po[i].getBinding(new SimpleString("queue0"));
+ Assert.assertNotNull(bindings[i]);
+
+ Queue queue0 = (Queue)bindings[i].getBindable();
+ Assert.assertNotNull(queue0);
+
+ QueueConfiguration qconfig = queue0.getQueueConfiguration();
+ Assert.assertNotNull(qconfig);
+
+ qconfig.setFilterString("color = 'red'");
+ po[i].updateQueue(qconfig, true);
+ }
+
+ SimpleString clusterName0 = bindings[1].getClusterName();
+ RemoteQueueBinding remoteBinding = (RemoteQueueBinding)
po[0].getBinding(clusterName0);
+ Assert.assertNotNull(remoteBinding);
+
+ Wait.assertEquals("color = 'red'", () -> {
+ Filter filter = remoteBinding.getFilter();
+ if (filter == null) {
+ return filter;
+ }
+ return filter.getFilterString().toString();
+ });
+ }
+
protected void setupCluster(final MessageLoadBalancingType
messageLoadBalancingType) throws Exception {
setupClusterConnection("cluster0", "queues", messageLoadBalancingType,
1, isNetty(), 0, 1);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java
index 641c9a4008..364e5c47b2 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java
@@ -38,6 +38,7 @@ import org.junit.Assert;
import org.junit.Test;
public class ConfigurationTest extends ActiveMQTestBase {
+
@Test
public void testStartWithDuplicateQueues() throws Exception {
ActiveMQServer server = getActiveMQServer("duplicate-queues.xml");
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index f833340a0d..6502422af8 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -514,6 +514,10 @@ public class BindingsImplTest extends ActiveMQTestBase {
return 0;
}
+ @Override
+ public void setFilter(Filter filter) {
+ }
+
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return messageLoadBalancingType;