This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 07b02159d2 ARTEMIS-4570 filter not applied to all brokers in cluster
07b02159d2 is described below

commit 07b02159d26f15a7f21c63726438cbcccdc746a8
Author: Howard Gao <howard....@gmail.com>
AuthorDate: Mon Jan 15 18:39:14 2024 +0800

    ARTEMIS-4570 filter not applied to all brokers in cluster
---
 .../api/core/management/CoreNotificationType.java  |  3 +-
 .../core/postoffice/impl/PostOfficeImpl.java       | 17 +++++++
 .../core/server/cluster/RemoteQueueBinding.java    |  3 ++
 .../cluster/impl/ClusterConnectionBridge.java      |  2 +
 .../server/cluster/impl/ClusterConnectionImpl.java | 21 +++++++++
 .../cluster/impl/RemoteQueueBindingImpl.java       |  9 +++-
 .../artemis/core/server/impl/QueueImpl.java        |  4 +-
 .../distribution/MessageLoadBalancingTest.java     | 55 ++++++++++++++++++++++
 .../integration/server/ConfigurationTest.java      |  1 +
 .../core/postoffice/impl/BindingsImplTest.java     |  4 ++
 10 files changed, 115 insertions(+), 4 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
index 686400ef7a..a07f8f4dae 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
@@ -47,7 +47,8 @@ public enum CoreNotificationType implements NotificationType {
    SESSION_CREATED(26),
    SESSION_CLOSED(27),
    MESSAGE_DELIVERED(28),
-   MESSAGE_EXPIRED(29);
+   MESSAGE_EXPIRED(29),
+   BINDING_UPDATED(30);
 
    private final int value;
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index f241a964af..9cf8b61908 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -801,6 +801,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
             if ((forceUpdate || newFilter != oldFilter) && 
!Objects.equals(oldFilter, newFilter)) {
                changed = true;
                queue.setFilter(newFilter);
+               notifyBindingUpdatedForQueue(queueBinding);
             }
             if ((forceUpdate || queueConfiguration.isConfigurationManaged() != 
null) && !Objects.equals(queueConfiguration.isConfigurationManaged(), 
queue.isConfigurationManaged())) {
                changed = true;
@@ -836,6 +837,22 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       }
    }
 
+   public void notifyBindingUpdatedForQueue(QueueBinding binding) throws 
Exception {
+      //only the filter could be updated
+      TypedProperties props = new TypedProperties();
+      props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, 
binding.getClusterName());
+      Filter filter = binding.getFilter();
+      if (filter != null) {
+         props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, 
filter.getFilterString());
+      }
+      props.putIntProperty(ManagementHelper.HDR_DISTANCE, 
binding.getDistance());
+      props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, 
binding.getAddress());
+
+      String uid = UUIDGenerator.getInstance().generateStringUUID();
+      logger.debug("ClusterCommunication::Sending notification for 
updateBinding {} from server {}", binding, server);
+      managementService.sendNotification(new Notification(uid, 
CoreNotificationType.BINDING_UPDATED, props));
+   }
+
    @Override
    public AddressInfo updateAddressInfo(SimpleString addressName,
                                         EnumSet<RoutingType> routingTypes) 
throws Exception {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java
index b65d25f319..2c9a7e59de 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/RemoteQueueBinding.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server.cluster;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 
@@ -34,5 +35,7 @@ public interface RemoteQueueBinding extends QueueBinding {
 
    long getRemoteQueueID();
 
+   void setFilter(Filter filter);
+
    MessageLoadBalancingType getMessageLoadBalancingType();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index f977c0852f..2bd045500d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -258,6 +258,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
                                                    "', '" +
                                                    
CoreNotificationType.BINDING_REMOVED +
                                                    "', '" +
+                                                   
CoreNotificationType.BINDING_UPDATED +
+                                                   "', '" +
                                                    
CoreNotificationType.CONSUMER_CREATED +
                                                    "', '" +
                                                    
CoreNotificationType.CONSUMER_CLOSED +
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 32d11aa394..98397440ff 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.core.client.impl.TopologyManager;
 import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -1134,6 +1135,10 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
                break;
             }
+            case BINDING_UPDATED: {
+               doBindingUpdated(message);
+               break;
+            }
             case CONSUMER_CREATED: {
                doConsumerCreated(message);
 
@@ -1269,6 +1274,22 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
          }
       }
 
+      private synchronized void doBindingUpdated(final ClientMessage message) 
throws Exception {
+         logger.trace("{} Update binding {}", ClusterConnectionImpl.this, 
message);
+         if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
+            throw new IllegalStateException("clusterName is null");
+         }
+
+         SimpleString clusterName = 
message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
+         SimpleString filterString = 
message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
+
+         RemoteQueueBinding existingBinding = (RemoteQueueBinding) 
postOffice.getBinding(clusterName);
+
+         if (existingBinding != null) {
+            existingBinding.setFilter(FilterImpl.createFilter(filterString));
+         }
+      }
+
       private synchronized void doBindingAdded(final ClientMessage message) 
throws Exception {
          logger.trace("{} Adding binding {}", ClusterConnectionImpl.this, 
message);
          if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 5fcf947bbd..992b909ff8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -50,12 +50,12 @@ public class RemoteQueueBindingImpl implements 
RemoteQueueBinding {
 
    private final long remoteQueueID;
 
-   private final Filter queueFilter;
-
    private final Set<Filter> filters = new HashSet<>();
 
    private final Map<SimpleString, Integer> filterCounts = new HashMap<>();
 
+   private Filter queueFilter;
+
    private int consumerCount;
 
    private final SimpleString idsHeaderName;
@@ -351,6 +351,11 @@ public class RemoteQueueBindingImpl implements 
RemoteQueueBinding {
       return remoteQueueID;
    }
 
+   @Override
+   public void setFilter(Filter filter) {
+      this.queueFilter = filter;
+   }
+
    @Override
    public MessageLoadBalancingType getMessageLoadBalancingType() {
       return messageLoadBalancingType;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 50f216ca30..9d4a53fa4e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -4322,7 +4322,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          .setTemporary(temporary)
          .setInternal(internalQueue)
          .setTransient(refCountForConsumers instanceof 
TransientQueueManagerImpl)
-         .setAutoCreated(autoCreated);
+         .setAutoCreated(autoCreated)
+         .setEnabled(enabled)
+         .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch);
    }
 
    protected static class ConsumerHolder<T extends Consumer> implements 
PriorityAware {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
index 37c23305d9..ea763069b5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java
@@ -16,9 +16,17 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -86,6 +94,53 @@ public class MessageLoadBalancingTest extends 
ClusterTestBase {
       Assert.assertNull(clientMessage);
    }
 
+   @Test
+   public void testMessageLoadBalancingWithFiltersUpdate() throws Exception {
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(0, "queues.testaddress", 1, 0, false);
+      waitForBindings(1, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+      Binding[] bindings = new Binding[2];
+      PostOffice[] po = new PostOffice[2];
+      for (int i = 0; i < 2; i++) {
+         po[i] = servers[i].getPostOffice();
+         bindings[i] = po[i].getBinding(new SimpleString("queue0"));
+         Assert.assertNotNull(bindings[i]);
+
+         Queue queue0 = (Queue)bindings[i].getBindable();
+         Assert.assertNotNull(queue0);
+
+         QueueConfiguration qconfig = queue0.getQueueConfiguration();
+         Assert.assertNotNull(qconfig);
+
+         qconfig.setFilterString("color = 'red'");
+         po[i].updateQueue(qconfig, true);
+      }
+
+      SimpleString clusterName0 = bindings[1].getClusterName();
+      RemoteQueueBinding remoteBinding = (RemoteQueueBinding) 
po[0].getBinding(clusterName0);
+      Assert.assertNotNull(remoteBinding);
+
+      Wait.assertEquals("color = 'red'", () -> {
+         Filter filter = remoteBinding.getFilter();
+         if (filter == null) {
+            return filter;
+         }
+         return filter.getFilterString().toString();
+      });
+   }
+
    protected void setupCluster(final MessageLoadBalancingType 
messageLoadBalancingType) throws Exception {
       setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 
1, isNetty(), 0, 1);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java
index 641c9a4008..364e5c47b2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConfigurationTest.java
@@ -38,6 +38,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class ConfigurationTest extends ActiveMQTestBase {
+
    @Test
    public void testStartWithDuplicateQueues() throws Exception {
       ActiveMQServer server = getActiveMQServer("duplicate-queues.xml");
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index f833340a0d..6502422af8 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -514,6 +514,10 @@ public class BindingsImplTest extends ActiveMQTestBase {
          return 0;
       }
 
+      @Override
+      public void setFilter(Filter filter) {
+      }
+
       @Override
       public MessageLoadBalancingType getMessageLoadBalancingType() {
          return messageLoadBalancingType;

Reply via email to