This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-6.2.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-6.2.x by this push:
     new 15e78425b2 Send advisory messages using Broker connection context 
(#2071) (#2075)
15e78425b2 is described below

commit 15e78425b2589efac8d47e1d5c54a3691e1f12bd
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Fri Jun 5 12:53:20 2026 -0400

    Send advisory messages using Broker connection context (#2071) (#2075)
    
    This updates the AdvisoryBroker to always publish advisory messages
    that were generated by other events to use the Broker's own
    ConnectionContext. Before this change the AdvisoryBroker was
    using the original ConnectionContext that used used for the action that
    triggered the advisory. This doesn't make sense because its actually the
    broker itself firing the advisory message and not the original
    connection. It also meant requiring all users to be given access to
    create new advisory topics that could be created on demand.
    
    After this update, all users no longer need permission to create
    advisory destinations which was required previously. Users only need
    read access to the temporary destination advisories for the AMQ client
    as the broker itself will now use its own context going forward to
    create all the destinations on demand and for publishing.
    
    This update also consolidates the on consumer with no messages advisory
    into the Advisory broker so it is all managed in one location.
    
    (cherry picked from commit 3598fc562337c467b3f6b59364b29ac2d9bdb068)
---
 .../apache/activemq/advisory/AdvisoryBroker.java   | 134 ++++++++++++++++-----
 .../java/org/apache/activemq/broker/Broker.java    |   8 ++
 .../org/apache/activemq/broker/BrokerFilter.java   |   5 +
 .../org/apache/activemq/broker/BrokerService.java  |  16 ++-
 .../org/apache/activemq/broker/EmptyBroker.java    |   5 +
 .../org/apache/activemq/broker/ErrorBroker.java    |   5 +
 .../activemq/broker/region/BaseDestination.java    |  50 +-------
 .../activemq/broker/util/LoggingBrokerPlugin.java  |   9 ++
 .../network/DemandForwardingBridgeSupport.java     |  12 +-
 .../transport/mqtt/MQTTAuthTestSupport.java        |  12 +-
 .../activemq/transport/stomp/StompTestSupport.java |  12 +-
 .../apache/activemq/advisory/AdvisoryTests.java    |  36 +++++-
 .../activemq/broker/policy/SecureDLQTest.java      |   4 +-
 .../security/DestinationAdminAuthzTest.java        |   9 +-
 .../security/SimpleSecurityBrokerSystemTest.java   |   4 +-
 .../security/jaas-broker-guest-no-creds-only.xml   |   9 +-
 .../apache/activemq/security/jaas-broker-guest.xml |   9 +-
 .../org/apache/activemq/security/jaas-broker.xml   |  11 +-
 assembly/src/release/conf/activemq.xml             |  27 +++--
 19 files changed, 256 insertions(+), 121 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 38cd49bab0..5eda70dc24 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -23,9 +23,11 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.broker.Broker;
@@ -61,7 +63,6 @@ import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.filter.DestinationPath;
-import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.IdGenerator;
@@ -79,6 +80,7 @@ public class AdvisoryBroker extends BrokerFilter {
     private static final Logger LOG = 
LoggerFactory.getLogger(AdvisoryBroker.class);
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
 
+    protected final AtomicReference<ConnectionContext> 
advisoryConnectionContext = new AtomicReference<>();
     protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections = 
new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
 
     private final ReentrantReadWriteLock consumersLock = new 
ReentrantReadWriteLock();
@@ -107,13 +109,40 @@ public class AdvisoryBroker extends BrokerFilter {
 
     private final LongSequenceGenerator messageIdGenerator = new 
LongSequenceGenerator();
 
-    private VirtualDestinationMatcher virtualDestinationMatcher = new 
DestinationFilterVirtualDestinationMatcher();
+    private final VirtualDestinationMatcher virtualDestinationMatcher = new 
DestinationFilterVirtualDestinationMatcher();
 
     public AdvisoryBroker(Broker next) {
         super(next);
         advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
     }
 
+    @Override
+    public void setAdminConnectionContext(ConnectionContext 
adminConnectionContext) {
+        super.setAdminConnectionContext(adminConnectionContext);
+        // Create a copy of the adminConnection context and set flow control 
false
+        // This will be used to publish all advisories. This will be called
+        // during broker construction and before the first advisories are sent.
+        ConnectionContext connectionContext = adminConnectionContext.copy();
+        connectionContext.setProducerFlowControl(false);
+        this.advisoryConnectionContext.set(connectionContext);
+    }
+
+    @Override
+    public void start() throws Exception {
+        super.start();
+        // Sanity check to make sure we setAdminConnectionContext() was called 
and
+        // we initialized the admin context
+        if (advisoryConnectionContext.get() == null) {
+            throw new IllegalArgumentException("AdminConnectionContext was not 
initialized");
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        super.stop();
+        this.advisoryConnectionContext.set(null);
+    }
+
     @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) 
throws Exception {
         super.addConnection(context, info);
@@ -538,6 +567,46 @@ public class AdvisoryBroker extends BrokerFilter {
         }
     }
 
+    @Override
+    public void messageNoConsumers(ConnectionContext context, MessageReference 
messageReference) {
+        super.messageNoConsumers(context, messageReference);
+        try {
+            if (!messageReference.isAdvisory()) {
+                // allow messages with no consumers to be dispatched to a dead
+                // letter queue
+                BaseDestination baseDestination = (BaseDestination) 
messageReference.getMessage().getRegionDestination();
+                ActiveMQDestination destination = 
baseDestination.getActiveMQDestination();
+                if (destination.isQueue() || 
!AdvisorySupport.isAdvisoryTopic(destination)) {
+
+                    Message message = messageReference.getMessage().copy();
+                    // The original destination and transaction id do not get
+                    // filled when the message is first sent,
+                    // it is only populated if the message is routed to another
+                    // destination like the DLQ
+                    if (message.getOriginalDestination() != null) {
+                        
message.setOriginalDestination(message.getDestination());
+                    }
+                    if (message.getOriginalTransactionId() != null) {
+                        
message.setOriginalTransactionId(message.getTransactionId());
+                    }
+
+                    ActiveMQTopic advisoryTopic;
+                    if (destination.isQueue()) {
+                        advisoryTopic = 
AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
+                    } else {
+                        advisoryTopic = 
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
+                    }
+                    message.setDestination(advisoryTopic);
+                    message.setTransactionId(null);
+
+                    context.getBroker().send(newAdvisoryProducerExchange(), 
message);
+                }
+            }
+        } catch (Exception e) {
+            handleFireFailure("discarded", e);
+        }
+    }
+
     @Override
     public void slowConsumer(ConnectionContext context, Destination 
destination, Subscription subs) {
         super.slowConsumer(context, destination, subs);
@@ -775,10 +844,7 @@ public class AdvisoryBroker extends BrokerFilter {
         try {
             ActiveMQTopic topic = 
AdvisorySupport.getMasterBrokerAdvisoryTopic();
             ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-            ConnectionContext context = new ConnectionContext();
-            
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
-            context.setBroker(getBrokerService().getBroker());
-            fireAdvisory(context, topic, null, null, advisoryMessage);
+            fireAdvisory(topic, null, advisoryMessage);
         } catch (Exception e) {
             handleFireFailure("now master broker", e);
         }
@@ -817,12 +883,7 @@ public class AdvisoryBroker extends BrokerFilter {
                 advisoryMessage.setStringProperty("remoteIp", remoteIp);
                 networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
 
-                ActiveMQTopic topic = 
AdvisorySupport.getNetworkBridgeAdvisoryTopic();
-
-                ConnectionContext context = new ConnectionContext();
-                
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
-                context.setBroker(getBrokerService().getBroker());
-                fireAdvisory(context, topic, brokerInfo, null, 
advisoryMessage);
+                fireAdvisory(AdvisorySupport.getNetworkBridgeAdvisoryTopic(), 
brokerInfo, advisoryMessage);
             }
         } catch (Exception e) {
             handleFireFailure("network bridge started", e);
@@ -837,12 +898,7 @@ public class AdvisoryBroker extends BrokerFilter {
                 advisoryMessage.setBooleanProperty("started", false);
                 networkBridges.remove(brokerInfo);
 
-                ActiveMQTopic topic = 
AdvisorySupport.getNetworkBridgeAdvisoryTopic();
-
-                ConnectionContext context = new ConnectionContext();
-                
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
-                context.setBroker(getBrokerService().getBroker());
-                fireAdvisory(context, topic, brokerInfo, null, 
advisoryMessage);
+                fireAdvisory(AdvisorySupport.getNetworkBridgeAdvisoryTopic(), 
brokerInfo, advisoryMessage);
             }
         } catch (Exception e) {
             handleFireFailure("network bridge stopped", e);
@@ -899,7 +955,19 @@ public class AdvisoryBroker extends BrokerFilter {
         fireAdvisory(context, topic, command, targetConsumerId, 
advisoryMessage);
     }
 
-    public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, 
Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) 
throws Exception {
+    public void fireFailedForwardAdvisory(Message message, Throwable error) 
throws Exception {
+        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+        advisoryMessage.setStringProperty("cause", 
error.getLocalizedMessage());
+
+        
fireAdvisory(AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), 
message, advisoryMessage);
+    }
+
+    private void fireAdvisory(ActiveMQTopic topic, Command command, 
ActiveMQMessage advisoryMessage) throws Exception {
+        fireAdvisory(advisoryConnectionContext.get(), topic, command, null, 
advisoryMessage);
+    }
+
+    private void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, 
Command command,
+            ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) 
throws Exception {
         //set properties
         
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME,
 getBrokerName());
         String id = getBrokerId() != null ? getBrokerId().getValue() : 
"NOT_SET";
@@ -925,17 +993,11 @@ public class AdvisoryBroker extends BrokerFilter {
         advisoryMessage.setDestination(topic);
         advisoryMessage.setResponseRequired(false);
         advisoryMessage.setProducerId(advisoryProducerId);
-        boolean originalFlowControl = context.isProducerFlowControl();
-        final ProducerBrokerExchange producerExchange = new 
ProducerBrokerExchange();
-        producerExchange.setConnectionContext(context);
-        producerExchange.setMutable(true);
-        producerExchange.setProducerState(new ProducerState(new 
ProducerInfo()));
-        try {
-            context.setProducerFlowControl(false);
-            next.send(producerExchange, advisoryMessage);
-        } finally {
-            context.setProducerFlowControl(originalFlowControl);
-        }
+
+        // The advisory messages are generated the broker itself, so this send 
will
+        // publish the advisory message using the Broker ConnectionContext so 
there will
+        // be admin permissions granted.
+        next.send(newAdvisoryProducerExchange(), advisoryMessage);
     }
 
     public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
@@ -963,7 +1025,7 @@ public class AdvisoryBroker extends BrokerFilter {
         return virtualDestinationConsumers;
     }
 
-    private class VirtualConsumerPair {
+    protected class VirtualConsumerPair {
         private final VirtualDestination virtualDestination;
 
         //destination that matches this virtualDestination as part target
@@ -1028,4 +1090,14 @@ public class AdvisoryBroker extends BrokerFilter {
             return AdvisoryBroker.this;
         }
     }
+
+    protected ProducerBrokerExchange newAdvisoryProducerExchange() {
+        final ProducerBrokerExchange producerExchange = new 
ProducerBrokerExchange();
+        
producerExchange.setConnectionContext(Objects.requireNonNull(advisoryConnectionContext.get(),
+                "Advisory ConnectionContext must not be null"));
+        producerExchange.setMutable(true);
+        producerExchange.setProducerState(new ProducerState(new 
ProducerInfo()));
+        return producerExchange;
+    }
+
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
index b96167c5ef..0462b76cfa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
@@ -370,6 +370,14 @@ public interface Broker extends Region, Service {
      */
     void messageDiscarded(ConnectionContext context, Subscription sub, 
MessageReference messageReference);
 
+    /**
+     * Called when a message is processed with no consumers
+     *
+     * @param context connection context
+     * @param messageReference message reference
+     */
+    void messageNoConsumers(ConnectionContext context, MessageReference 
messageReference);
+
     /**
      * Called when there is a slow consumer
      * @param context connection context
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
index b9374e352d..c0c5d05d63 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
@@ -361,6 +361,11 @@ public class BrokerFilter implements Broker {
         getNext().messageDiscarded(context, sub, messageReference);
     }
 
+    @Override
+    public void messageNoConsumers(ConnectionContext context, MessageReference 
messageReference) {
+        getNext().messageNoConsumers(context, messageReference);
+    }
+
     @Override
     public void slowConsumer(ConnectionContext context, Destination 
destination,Subscription subs) {
         getNext().slowConsumer(context, destination,subs);
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 99404db5dc..4cb353ba3c 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -733,8 +733,18 @@ public class BrokerService implements Service {
         }
     }
 
+    // Ensure the broker chain is fully initialized and we create the admin 
connection.
+    // The admin connection is needed to create destinations and for the 
AdvisoryBroker
+    // before broker startup. Creating the connection will also call the
+    // setAdminConnectionContext() callback on the Broker chain. This ensures 
initialization
+    // is done correctly even if someone overrides getAdminConnectionContext();
+    private void initializeAdminConnection() throws Exception {
+        BrokerSupport.getConnectionContext(getBroker());
+    }
+
     private void doStartBroker() throws Exception {
         checkStartException();
+        initializeAdminConnection();
         startDestinations();
         addShutdownHook();
 
@@ -2447,7 +2457,7 @@ public class BrokerService implements Service {
     protected Broker addInterceptors(Broker broker) throws Exception {
         if (isAdvisorySupport()) {
             // AMQ-9187 - the AdvisoryBroker must be after the SchedulerBroker
-            broker = new AdvisoryBroker(broker);
+            broker = createAdvisoryBroker(broker);
         }
         if (isSchedulerSupport()) {
             SchedulerBroker sb = new SchedulerBroker(this, broker, 
getJobSchedulerStore());
@@ -2515,6 +2525,10 @@ public class BrokerService implements Service {
         }
     }
 
+    protected AdvisoryBroker createAdvisoryBroker(Broker broker) {
+        return new AdvisoryBroker(broker);
+    }
+
     protected ObjectName createBrokerObjectName() throws 
MalformedObjectNameException  {
         return 
BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(),
 getBrokerName());
     }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
index 4872a5a0fa..7bd65fcc71 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
@@ -317,6 +317,11 @@ public class EmptyBroker implements Broker {
     public void messageDiscarded(ConnectionContext context, Subscription sub, 
MessageReference messageReference) {
     }
 
+    @Override
+    public void messageNoConsumers(ConnectionContext context, MessageReference 
messageReference) {
+
+    }
+
     @Override
     public void slowConsumer(ConnectionContext context,Destination 
destination, Subscription subs) {
     }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
index 8c138e3945..ccfbc9daa8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
@@ -358,6 +358,11 @@ public class ErrorBroker implements Broker {
         throw new BrokerStoppedException(this.message);
     }
 
+    @Override
+    public void messageNoConsumers(ConnectionContext context, MessageReference 
messageReference) {
+        throw new BrokerStoppedException(this.message);
+    }
+
     @Override
     public void slowConsumer(ConnectionContext context, Destination 
destination,Subscription subs) {
         throw new BrokerStoppedException(this.message);
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 82ed8784a2..6aaa317fe4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import jakarta.jms.ResourceAllocationException;
 
-import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -643,50 +642,11 @@ public abstract class BaseDestination implements 
Destination {
      * Provides a hook to allow messages with no consumer to be processed in
      * some way - such as to send to a dead letter queue or something..
      */
-    protected void onMessageWithNoConsumers(ConnectionContext context, Message 
msg) throws Exception {
-        if (!msg.isPersistent()) {
-            if (isSendAdvisoryIfNoConsumers()) {
-                // allow messages with no consumers to be dispatched to a dead
-                // letter queue
-                if (destination.isQueue() || 
!AdvisorySupport.isAdvisoryTopic(destination)) {
-
-                    Message message = msg.copy();
-                    // The original destination and transaction id do not get
-                    // filled when the message is first sent,
-                    // it is only populated if the message is routed to another
-                    // destination like the DLQ
-                    if (message.getOriginalDestination() != null) {
-                        
message.setOriginalDestination(message.getDestination());
-                    }
-                    if (message.getOriginalTransactionId() != null) {
-                        
message.setOriginalTransactionId(message.getTransactionId());
-                    }
-
-                    ActiveMQTopic advisoryTopic;
-                    if (destination.isQueue()) {
-                        advisoryTopic = 
AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
-                    } else {
-                        advisoryTopic = 
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
-                    }
-                    message.setDestination(advisoryTopic);
-                    message.setTransactionId(null);
-
-                    // Disable flow control for this since since we don't want
-                    // to block.
-                    boolean originalFlowControl = 
context.isProducerFlowControl();
-                    try {
-                        context.setProducerFlowControl(false);
-                        ProducerBrokerExchange producerExchange = new 
ProducerBrokerExchange();
-                        producerExchange.setMutable(false);
-                        producerExchange.setConnectionContext(context);
-                        producerExchange.setProducerState(new 
ProducerState(new ProducerInfo()));
-                        context.getBroker().send(producerExchange, message);
-                    } finally {
-                        context.setProducerFlowControl(originalFlowControl);
-                    }
-
-                }
-            }
+    protected void onMessageWithNoConsumers(ConnectionContext context, Message 
msg) {
+        if (!msg.isPersistent() && isSendAdvisoryIfNoConsumers()) {
+            // allow messages with no consumers to be dispatched to a dead
+            // letter queue
+            broker.messageNoConsumers(context, msg);
         }
     }
 
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
index 798f5644cb..72c6f352d2 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
@@ -559,6 +559,15 @@ public class LoggingBrokerPlugin extends 
BrokerPluginSupport {
         super.messageDiscarded(context, sub, messageReference);
     }
 
+    @Override
+    public void messageNoConsumers(ConnectionContext context, MessageReference 
messageReference) {
+        if (isLogAll() || isLogInternalEvents()) {
+            String msg = messageReference.getMessage().toString();
+            LOG.info("Message without consumers: {}", msg);
+        }
+        super.messageNoConsumers(context, messageReference);
+    }
+
     @Override
     public void slowConsumer(ConnectionContext context, Destination 
destination, Subscription subs) {
         if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 8d16445fb9..26a5769ed0 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1152,18 +1152,10 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                 advisoryBroker = (AdvisoryBroker) 
brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
 
                 if (advisoryBroker != null) {
-                    ConnectionContext context = new ConnectionContext();
-                    
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
-                    context.setBroker(brokerService.getBroker());
-
-                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-                    advisoryMessage.setStringProperty("cause", 
error.getLocalizedMessage());
-                    advisoryBroker.fireAdvisory(context, 
AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), 
messageDispatch.getMessage(), null,
-                            advisoryMessage);
-
+                    
advisoryBroker.fireFailedForwardAdvisory(messageDispatch.getMessage(), error);
                 }
             } catch (Exception e) {
-                LOG.warn("failed to fire forward failure advisory, cause: {}", 
e);
+                LOG.warn("failed to fire forward failure advisory, cause: {}", 
e.getMessage());
                 LOG.debug("detail", e);
             }
         }
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
index 6e327bfe53..11de298419 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
@@ -103,10 +103,16 @@ public class MQTTAuthTestSupport extends MQTTTestSupport {
         entry.setAdmin("guests,users,anonymous");
         authorizationEntries.add(entry);
         entry = new AuthorizationEntry();
-        entry.setTopic("ActiveMQ.Advisory.>");
+        entry.setTopic("ActiveMQ.Advisory.TempQueue");
         entry.setRead("guests,users,anonymous");
-        entry.setWrite("guests,users,anonymous");
-        entry.setAdmin("guests,users,anonymous");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("ActiveMQ.Advisory.TempTopic");
+        entry.setRead("guests,users,anonymous");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
         authorizationEntries.add(entry);
 
         TempDestinationAuthorizationEntry tempEntry = new 
TempDestinationAuthorizationEntry();
diff --git 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
index ecdeae5833..e155599d2d 100644
--- 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
+++ 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
@@ -292,10 +292,16 @@ public class StompTestSupport {
         entry.setAdmin("guests,users");
         authorizationEntries.add(entry);
         entry = new AuthorizationEntry();
-        entry.setTopic("ActiveMQ.Advisory.>");
+        entry.setTopic("ActiveMQ.Advisory.TempQueue");
         entry.setRead("guests,users");
-        entry.setWrite("guests,users");
-        entry.setAdmin("guests,users");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("ActiveMQ.Advisory.TempTopic");
+        entry.setRead("guests,users");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
         authorizationEntries.add(entry);
 
         TempDestinationAuthorizationEntry tempEntry = new 
TempDestinationAuthorizationEntry();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 8b78f6849f..40d09c35e7 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -19,12 +19,14 @@ package org.apache.activemq.advisory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import jakarta.jms.BytesMessage;
 import jakarta.jms.Connection;
@@ -45,6 +47,8 @@ import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import 
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -53,6 +57,7 @@ import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.security.SecurityContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -687,7 +692,35 @@ public class AdvisoryTests {
     }
 
     protected BrokerService createBroker() throws Exception {
-        BrokerService answer = new BrokerService();
+        BrokerService answer = new BrokerService() {
+            // Wrap the broker used by the Advisory broker so we can intercept 
the send()
+            // calls and verify any messages published to advisory topics by 
the advisory broker
+            // use the right context. The AdvisoryBroker delegates to the 
"next" broker in the
+            // chain when sending generated advisories.
+            @Override
+            protected AdvisoryBroker createAdvisoryBroker(Broker broker) {
+                return new AdvisoryBroker(new BrokerFilter(broker) {
+                    // track first connection context used for advisories
+                    private final AtomicReference<ConnectionContext> first = 
new AtomicReference<>();
+
+                    @Override
+                    public void send(ProducerBrokerExchange producerExchange,
+                            org.apache.activemq.command.Message messageSend) 
throws Exception {
+                        super.send(producerExchange, messageSend);
+                        // Verify all advisory topic publishes use the admin 
context
+                        // This filter is only used by the advisory broker so 
all published
+                        // advisories should be the broker security context
+                        if 
(AdvisorySupport.isAdvisoryTopic(messageSend.getDestination())) {
+                            first.compareAndSet(null, 
producerExchange.getConnectionContext());
+                            
assertEquals(SecurityContext.BROKER_SECURITY_CONTEXT,
+                                    
producerExchange.getConnectionContext().getSecurityContext());
+                            // ConnectionContext is reused for each message 
(but producer exchange is not)
+                            assertSame(first.get(), 
producerExchange.getConnectionContext());
+                        }
+                    }
+                });
+            }
+        };
         configureBroker(answer);
         answer.start();
         return answer;
@@ -742,6 +775,7 @@ public class AdvisoryTests {
 
                         super.preProcessDispatch(messageDispatch);
                     }
+
                 };
             }
         } });
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
index 2ef59938e4..5a83787a7b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
@@ -43,8 +43,8 @@ public class SecureDLQTest extends DeadLetterTestSupport {
         writeAccess.put(new ActiveMQQueue("TEST"), USERS);
         writeAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS);
 
-        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
-        writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), 
WILDCARD);
+        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), 
WILDCARD);
 
         DestinationMap adminAccess = new DefaultAuthorizationMap();
         adminAccess.put(new ActiveMQQueue("TEST"), ADMINS);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java
index a06861fa0c..08366af042 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java
@@ -269,9 +269,12 @@ public class DestinationAdminAuthzTest {
         adminAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP);
         adminAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP);
 
-        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
-        writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
-        adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), 
WILDCARD);
+        writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), 
ADMINS);
+        adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), 
ADMINS);
+        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), 
WILDCARD);
+        writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), 
ADMINS);
+        adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), 
ADMINS);
 
         var authorizationMap = new SimpleAuthorizationMap(writeAccess, 
readAccess, adminAccess);
         var tempDestinationAuthorizationEntry = new 
TempDestinationAuthorizationEntry();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
index fc9dc80073..fe237c7940 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
@@ -130,8 +130,8 @@ public class SimpleSecurityBrokerSystemTest extends 
SecurityTestSupport {
         writeAccess.put(new ActiveMQTopic("GUEST.>"), USERS);
         writeAccess.put(new ActiveMQTopic("GUEST.>"), GUESTS);
 
-        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
-        writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), 
WILDCARD);
+        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), 
WILDCARD);
 
         DestinationMap adminAccess = new DefaultAuthorizationMap();
         adminAccess.put(new ActiveMQTopic(">"), ADMINS);
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml
index 6ad1e72c2f..4c171a05b2 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml
@@ -41,7 +41,12 @@
              <authorizationEntry queue="&gt;" read="admins" write="admins" 
admin="admins"/>
              <authorizationEntry topic="&gt;" read="admins" write="admins" 
admin="admins"/>
              <authorizationEntry queue="GuestQueue" read="admins" 
write="admins, guests" admin="admins"/>
-             <authorizationEntry topic="ActiveMQ.Advisory.&gt;" read="guests" 
write="guests" admin="guests"/>
+              <!-- Grant only admins the ability to create/delete advisory 
destinations -->
+              <authorizationEntry topic="ActiveMQ.Advisory.&gt;" read="admins" 
write="admins" admin="admins"/>
+
+              <!-- All users need read access to temporary destination 
advisories -->
+              <authorizationEntry topic="ActiveMQ.Advisory.TempQueue" read="*" 
write="admin" admin="admin"/>
+              <authorizationEntry topic="ActiveMQ.Advisory.TempTopic" read="*" 
write="admin" admin="admin"/>
             </authorizationEntries>    
           </authorizationMap>
         </map>
@@ -53,4 +58,4 @@
        </transportConnectors>
   </broker>
 
-</beans>
\ No newline at end of file
+</beans>
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml
index c8474a349e..c2d10ba5dd 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml
@@ -41,7 +41,12 @@
              <authorizationEntry queue="&gt;" read="admins" write="admins" 
admin="admins"/>
              <authorizationEntry topic="&gt;" read="admins" write="admins" 
admin="admins"/>
              <authorizationEntry queue="GuestQueue" read="admins" 
write="admins, guests" admin="admins"/>
-             <authorizationEntry topic="ActiveMQ.Advisory.&gt;" read="guests" 
write="guests" admin="guests"/>
+              <!-- Grant only admins the ability to create/delete advisory 
destinations -->
+              <authorizationEntry topic="ActiveMQ.Advisory.&gt;" read="admins" 
write="admins" admin="admins"/>
+
+              <!-- All users need read access to temporary destination 
advisories -->
+              <authorizationEntry topic="ActiveMQ.Advisory.TempQueue" read="*" 
write="admin" admin="admin"/>
+              <authorizationEntry topic="ActiveMQ.Advisory.TempTopic" read="*" 
write="admin" admin="admin"/>
             </authorizationEntries>    
           </authorizationMap>
         </map>
@@ -53,4 +58,4 @@
        </transportConnectors>
   </broker>
 
-</beans>
\ No newline at end of file
+</beans>
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml
index 246921584f..a30f17aa41 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml
@@ -46,13 +46,12 @@
               <authorizationEntry topic="USERS.>" read="users" write="users" 
admin="users" />
               <authorizationEntry topic="GUEST.>" read="guests" 
write="guests,users" admin="guests,users" />
 
-              <!-- Grant all users the ability to create/delete advisory 
destinations
-               but only admins read/write -->
-              <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" 
write="admins" admin="guests,users"/>
+              <!-- Grant only admins the ability to create/delete advisory 
destinations -->
+              <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" 
write="admins" admin="admins"/>
 
-              <!-- All users need full access to temporary destination 
advisories -->
-              <authorizationEntry topic="ActiveMQ.Advisory.TempQueue" read="*" 
write="*" admin="*"/>
-              <authorizationEntry topic="ActiveMQ.Advisory.TempTopic" read="*" 
write="*" admin="*"/>
+              <!-- All users need read access to temporary destination 
advisories -->
+              <authorizationEntry topic="ActiveMQ.Advisory.TempQueue" read="*" 
write="admin" admin="admin"/>
+              <authorizationEntry topic="ActiveMQ.Advisory.TempTopic" read="*" 
write="admin" admin="admin"/>
             </authorizationEntries>
             
             <!-- let's assign roles to temporary destinations. comment this 
entry if we don't want any roles assigned to temp destinations  -->
diff --git a/assembly/src/release/conf/activemq.xml 
b/assembly/src/release/conf/activemq.xml
index db176d4485..71285294a0 100644
--- a/assembly/src/release/conf/activemq.xml
+++ b/assembly/src/release/conf/activemq.xml
@@ -74,17 +74,24 @@
                  entirely; specify only packages you explicitly trust).
 
             NOTE ABOUT ADVISORY TOPICS:
-              1. All users need permission to create ActiveMQ.Advisory 
destinations,
-                 which is given by the "admin" acl. However, normal users 
should
-                 generally NOT be given access to read/write for advisories 
(except temp)
-                 as those messages are meant for admin users.
-              2. A notable exception to number 1 is regular users should be 
given access to
+              1. Normal (non-admin) users should generally NOT be given access 
to
+                 create/delete/read/write for advisories as those messages are 
meant for admin users.
+              2. A notable exception to number 1 is normal users should be 
given read access to
                  advisories for temporary destinations because 
ActiveMQConnection uses those advisories.
+                 Temp dest advisory topics are: ActiveMQ.Advisory.TempQueue 
and ActiveMQ.Advisory.TempTopic
               3. In addition, dynamic network connectors use advisories to 
determine
                  consumer demand so the users that will be used to create 
bridges need access
-                 consumer and virtual destination consumer advisories.
+                 consumer and virtual destination consumer advisories. 
Example, assuming bridge-user
+                 is the user that is used for the network connection:
 
-            For more information, see:
+                 <authorizationEntries>
+                     ......
+                     <authorizationEntry topic="ActiveMQ.Advisory.Consumer.>" 
read="bridge-user,admin" write="bridge-user,admin" admin="bridge-user,admin"/>
+                     <authorizationEntry 
topic="ActiveMQ.Advisory.VirtualDestination.Consumer.>" 
read="bridge-user,admin" write="bridge-user" admin="bridge-user,admin"/>
+                     ......
+                  </authorizationEntries>
+
+      For more information, see:
               https://activemq.apache.org/security
         -->
         <!--
@@ -97,9 +104,9 @@
                         <authorizationEntries>
                             <authorizationEntry queue=">" read="admins" 
write="admins" admin="admins" />
                             <authorizationEntry topic=">" read="admins" 
write="admins" admin="admins" />
-                            <authorizationEntry topic="ActiveMQ.Advisory.>" 
read="admins" write="admins" admin="admins,users" />
-                            <authorizationEntry 
topic="ActiveMQ.Advisory.TempQueue" read="admins,users" write="admins,users" 
admin="admins,users"/>
-                            <authorizationEntry 
topic="ActiveMQ.Advisory.TempTopic" read="admins,users" write="admins,users" 
admin="admins,users"/>
+                            <authorizationEntry topic="ActiveMQ.Advisory.>" 
read="admins" write="admins" admin="admins" />
+                            <authorizationEntry 
topic="ActiveMQ.Advisory.TempQueue" read="users,admins" write="admins" 
admin="admins"/>
+                            <authorizationEntry 
topic="ActiveMQ.Advisory.TempTopic" read="users,admins" write="admins" 
admin="admins"/>
                         </authorizationEntries>
                     </authorizationMap>
                 </map>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to