This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 3598fc5623 Send advisory messages using Broker connection context
(#2071)
3598fc5623 is described below
commit 3598fc562337c467b3f6b59364b29ac2d9bdb068
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jun 4 18:35:10 2026 -0400
Send advisory messages using Broker connection context (#2071)
This updates the AdvisoryBroker to always publish advisory messages
that were generated by other events to use the Broker's own
ConnectionContext. Before this change the AdvisoryBroker was
using the original ConnectionContext that used used for the action that
triggered the advisory. This doesn't make sense because its actually the
broker itself firing the advisory message and not the original
connection. It also meant requiring all users to be given access to
create new advisory topics that could be created on demand.
After this update, all users no longer need permission to create
advisory destinations which was required previously. Users only need
read access to the temporary destination advisories for the AMQ client
as the broker itself will now use its own context going forward to
create all the destinations on demand and for publishing.
This update also consolidates the on consumer with no messages advisory
into the Advisory broker so it is all managed in one location.
---
.../apache/activemq/advisory/AdvisoryBroker.java | 134 ++++++++++++++++-----
.../java/org/apache/activemq/broker/Broker.java | 8 ++
.../org/apache/activemq/broker/BrokerFilter.java | 5 +
.../org/apache/activemq/broker/BrokerService.java | 16 ++-
.../org/apache/activemq/broker/EmptyBroker.java | 5 +
.../org/apache/activemq/broker/ErrorBroker.java | 5 +
.../activemq/broker/region/BaseDestination.java | 50 +-------
.../activemq/broker/util/LoggingBrokerPlugin.java | 9 ++
.../network/DemandForwardingBridgeSupport.java | 12 +-
.../transport/mqtt/MQTTAuthTestSupport.java | 12 +-
.../activemq/transport/stomp/StompTestSupport.java | 12 +-
.../apache/activemq/advisory/AdvisoryTests.java | 36 +++++-
.../activemq/broker/policy/SecureDLQTest.java | 4 +-
.../security/DestinationAdminAuthzTest.java | 9 +-
.../security/SimpleSecurityBrokerSystemTest.java | 4 +-
.../security/jaas-broker-guest-no-creds-only.xml | 9 +-
.../apache/activemq/security/jaas-broker-guest.xml | 9 +-
.../org/apache/activemq/security/jaas-broker.xml | 11 +-
assembly/src/release/conf/activemq.xml | 27 +++--
19 files changed, 256 insertions(+), 121 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index fb797a6793..c942a399bd 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -23,9 +23,11 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.Broker;
@@ -61,7 +63,6 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.filter.DestinationPath;
-import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.IdGenerator;
@@ -79,6 +80,7 @@ public class AdvisoryBroker extends BrokerFilter {
private static final Logger LOG =
LoggerFactory.getLogger(AdvisoryBroker.class);
private static final IdGenerator ID_GENERATOR = new IdGenerator();
+ protected final AtomicReference<ConnectionContext>
advisoryConnectionContext = new AtomicReference<>();
protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections =
new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
private final ReentrantReadWriteLock consumersLock = new
ReentrantReadWriteLock();
@@ -107,13 +109,40 @@ public class AdvisoryBroker extends BrokerFilter {
private final LongSequenceGenerator messageIdGenerator = new
LongSequenceGenerator();
- private VirtualDestinationMatcher virtualDestinationMatcher = new
DestinationFilterVirtualDestinationMatcher();
+ private final VirtualDestinationMatcher virtualDestinationMatcher = new
DestinationFilterVirtualDestinationMatcher();
public AdvisoryBroker(Broker next) {
super(next);
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
}
+ @Override
+ public void setAdminConnectionContext(ConnectionContext
adminConnectionContext) {
+ super.setAdminConnectionContext(adminConnectionContext);
+ // Create a copy of the adminConnection context and set flow control
false
+ // This will be used to publish all advisories. This will be called
+ // during broker construction and before the first advisories are sent.
+ ConnectionContext connectionContext = adminConnectionContext.copy();
+ connectionContext.setProducerFlowControl(false);
+ this.advisoryConnectionContext.set(connectionContext);
+ }
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ // Sanity check to make sure we setAdminConnectionContext() was called
and
+ // we initialized the admin context
+ if (advisoryConnectionContext.get() == null) {
+ throw new IllegalArgumentException("AdminConnectionContext was not
initialized");
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ super.stop();
+ this.advisoryConnectionContext.set(null);
+ }
+
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info)
throws Exception {
super.addConnection(context, info);
@@ -540,6 +569,46 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
+ @Override
+ public void messageNoConsumers(ConnectionContext context, MessageReference
messageReference) {
+ super.messageNoConsumers(context, messageReference);
+ try {
+ if (!messageReference.isAdvisory()) {
+ // allow messages with no consumers to be dispatched to a dead
+ // letter queue
+ BaseDestination baseDestination = (BaseDestination)
messageReference.getMessage().getRegionDestination();
+ ActiveMQDestination destination =
baseDestination.getActiveMQDestination();
+ if (destination.isQueue() ||
!AdvisorySupport.isAdvisoryTopic(destination)) {
+
+ Message message = messageReference.getMessage().copy();
+ // The original destination and transaction id do not get
+ // filled when the message is first sent,
+ // it is only populated if the message is routed to another
+ // destination like the DLQ
+ if (message.getOriginalDestination() != null) {
+
message.setOriginalDestination(message.getDestination());
+ }
+ if (message.getOriginalTransactionId() != null) {
+
message.setOriginalTransactionId(message.getTransactionId());
+ }
+
+ ActiveMQTopic advisoryTopic;
+ if (destination.isQueue()) {
+ advisoryTopic =
AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
+ } else {
+ advisoryTopic =
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
+ }
+ message.setDestination(advisoryTopic);
+ message.setTransactionId(null);
+
+ context.getBroker().send(newAdvisoryProducerExchange(),
message);
+ }
+ }
+ } catch (Exception e) {
+ handleFireFailure("discarded", e);
+ }
+ }
+
@Override
public void slowConsumer(ConnectionContext context, Destination
destination, Subscription subs) {
super.slowConsumer(context, destination, subs);
@@ -777,10 +846,7 @@ public class AdvisoryBroker extends BrokerFilter {
try {
ActiveMQTopic topic =
AdvisorySupport.getMasterBrokerAdvisoryTopic();
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- ConnectionContext context = new ConnectionContext();
-
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
- context.setBroker(getBrokerService().getBroker());
- fireAdvisory(context, topic, null, null, advisoryMessage);
+ fireAdvisory(topic, null, advisoryMessage);
} catch (Exception e) {
handleFireFailure("now master broker", e);
}
@@ -819,12 +885,7 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryMessage.setStringProperty("remoteIp", remoteIp);
networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
- ActiveMQTopic topic =
AdvisorySupport.getNetworkBridgeAdvisoryTopic();
-
- ConnectionContext context = new ConnectionContext();
-
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
- context.setBroker(getBrokerService().getBroker());
- fireAdvisory(context, topic, brokerInfo, null,
advisoryMessage);
+ fireAdvisory(AdvisorySupport.getNetworkBridgeAdvisoryTopic(),
brokerInfo, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("network bridge started", e);
@@ -839,12 +900,7 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryMessage.setBooleanProperty("started", false);
networkBridges.remove(brokerInfo);
- ActiveMQTopic topic =
AdvisorySupport.getNetworkBridgeAdvisoryTopic();
-
- ConnectionContext context = new ConnectionContext();
-
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
- context.setBroker(getBrokerService().getBroker());
- fireAdvisory(context, topic, brokerInfo, null,
advisoryMessage);
+ fireAdvisory(AdvisorySupport.getNetworkBridgeAdvisoryTopic(),
brokerInfo, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("network bridge stopped", e);
@@ -901,7 +957,19 @@ public class AdvisoryBroker extends BrokerFilter {
fireAdvisory(context, topic, command, targetConsumerId,
advisoryMessage);
}
- public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic,
Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage)
throws Exception {
+ public void fireFailedForwardAdvisory(Message message, Throwable error)
throws Exception {
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setStringProperty("cause",
error.getLocalizedMessage());
+
+
fireAdvisory(AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(),
message, advisoryMessage);
+ }
+
+ private void fireAdvisory(ActiveMQTopic topic, Command command,
ActiveMQMessage advisoryMessage) throws Exception {
+ fireAdvisory(advisoryConnectionContext.get(), topic, command, null,
advisoryMessage);
+ }
+
+ private void fireAdvisory(ConnectionContext context, ActiveMQTopic topic,
Command command,
+ ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage)
throws Exception {
//set properties
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME,
getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() :
"NOT_SET";
@@ -927,17 +995,11 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
advisoryMessage.setProducerId(advisoryProducerId);
- boolean originalFlowControl = context.isProducerFlowControl();
- final ProducerBrokerExchange producerExchange = new
ProducerBrokerExchange();
- producerExchange.setConnectionContext(context);
- producerExchange.setMutable(true);
- producerExchange.setProducerState(new ProducerState(new
ProducerInfo()));
- try {
- context.setProducerFlowControl(false);
- next.send(producerExchange, advisoryMessage);
- } finally {
- context.setProducerFlowControl(originalFlowControl);
- }
+
+ // The advisory messages are generated the broker itself, so this send
will
+ // publish the advisory message using the Broker ConnectionContext so
there will
+ // be admin permissions granted.
+ next.send(newAdvisoryProducerExchange(), advisoryMessage);
}
public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
@@ -965,7 +1027,7 @@ public class AdvisoryBroker extends BrokerFilter {
return virtualDestinationConsumers;
}
- private class VirtualConsumerPair {
+ protected class VirtualConsumerPair {
private final VirtualDestination virtualDestination;
//destination that matches this virtualDestination as part target
@@ -1030,4 +1092,14 @@ public class AdvisoryBroker extends BrokerFilter {
return AdvisoryBroker.this;
}
}
+
+ protected ProducerBrokerExchange newAdvisoryProducerExchange() {
+ final ProducerBrokerExchange producerExchange = new
ProducerBrokerExchange();
+
producerExchange.setConnectionContext(Objects.requireNonNull(advisoryConnectionContext.get(),
+ "Advisory ConnectionContext must not be null"));
+ producerExchange.setMutable(true);
+ producerExchange.setProducerState(new ProducerState(new
ProducerInfo()));
+ return producerExchange;
+ }
+
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
index b96167c5ef..0462b76cfa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
@@ -370,6 +370,14 @@ public interface Broker extends Region, Service {
*/
void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference);
+ /**
+ * Called when a message is processed with no consumers
+ *
+ * @param context connection context
+ * @param messageReference message reference
+ */
+ void messageNoConsumers(ConnectionContext context, MessageReference
messageReference);
+
/**
* Called when there is a slow consumer
* @param context connection context
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
index b9374e352d..c0c5d05d63 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
@@ -361,6 +361,11 @@ public class BrokerFilter implements Broker {
getNext().messageDiscarded(context, sub, messageReference);
}
+ @Override
+ public void messageNoConsumers(ConnectionContext context, MessageReference
messageReference) {
+ getNext().messageNoConsumers(context, messageReference);
+ }
+
@Override
public void slowConsumer(ConnectionContext context, Destination
destination,Subscription subs) {
getNext().slowConsumer(context, destination,subs);
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index f00be0f833..74599becf7 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -733,8 +733,18 @@ public class BrokerService implements Service {
}
}
+ // Ensure the broker chain is fully initialized and we create the admin
connection.
+ // The admin connection is needed to create destinations and for the
AdvisoryBroker
+ // before broker startup. Creating the connection will also call the
+ // setAdminConnectionContext() callback on the Broker chain. This ensures
initialization
+ // is done correctly even if someone overrides getAdminConnectionContext();
+ private void initializeAdminConnection() throws Exception {
+ BrokerSupport.getConnectionContext(getBroker());
+ }
+
private void doStartBroker() throws Exception {
checkStartException();
+ initializeAdminConnection();
startDestinations();
addShutdownHook();
@@ -2447,7 +2457,7 @@ public class BrokerService implements Service {
protected Broker addInterceptors(Broker broker) throws Exception {
if (isAdvisorySupport()) {
// AMQ-9187 - the AdvisoryBroker must be after the SchedulerBroker
- broker = new AdvisoryBroker(broker);
+ broker = createAdvisoryBroker(broker);
}
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker,
getJobSchedulerStore());
@@ -2515,6 +2525,10 @@ public class BrokerService implements Service {
}
}
+ protected AdvisoryBroker createAdvisoryBroker(Broker broker) {
+ return new AdvisoryBroker(broker);
+ }
+
protected ObjectName createBrokerObjectName() throws
MalformedObjectNameException {
return
BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(),
getBrokerName());
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
index 4872a5a0fa..7bd65fcc71 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
@@ -317,6 +317,11 @@ public class EmptyBroker implements Broker {
public void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference) {
}
+ @Override
+ public void messageNoConsumers(ConnectionContext context, MessageReference
messageReference) {
+
+ }
+
@Override
public void slowConsumer(ConnectionContext context,Destination
destination, Subscription subs) {
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
index 8c138e3945..ccfbc9daa8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
@@ -358,6 +358,11 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message);
}
+ @Override
+ public void messageNoConsumers(ConnectionContext context, MessageReference
messageReference) {
+ throw new BrokerStoppedException(this.message);
+ }
+
@Override
public void slowConsumer(ConnectionContext context, Destination
destination,Subscription subs) {
throw new BrokerStoppedException(this.message);
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 82ed8784a2..6aaa317fe4 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import jakarta.jms.ResourceAllocationException;
-import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@@ -643,50 +642,11 @@ public abstract class BaseDestination implements
Destination {
* Provides a hook to allow messages with no consumer to be processed in
* some way - such as to send to a dead letter queue or something..
*/
- protected void onMessageWithNoConsumers(ConnectionContext context, Message
msg) throws Exception {
- if (!msg.isPersistent()) {
- if (isSendAdvisoryIfNoConsumers()) {
- // allow messages with no consumers to be dispatched to a dead
- // letter queue
- if (destination.isQueue() ||
!AdvisorySupport.isAdvisoryTopic(destination)) {
-
- Message message = msg.copy();
- // The original destination and transaction id do not get
- // filled when the message is first sent,
- // it is only populated if the message is routed to another
- // destination like the DLQ
- if (message.getOriginalDestination() != null) {
-
message.setOriginalDestination(message.getDestination());
- }
- if (message.getOriginalTransactionId() != null) {
-
message.setOriginalTransactionId(message.getTransactionId());
- }
-
- ActiveMQTopic advisoryTopic;
- if (destination.isQueue()) {
- advisoryTopic =
AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
- } else {
- advisoryTopic =
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
- }
- message.setDestination(advisoryTopic);
- message.setTransactionId(null);
-
- // Disable flow control for this since since we don't want
- // to block.
- boolean originalFlowControl =
context.isProducerFlowControl();
- try {
- context.setProducerFlowControl(false);
- ProducerBrokerExchange producerExchange = new
ProducerBrokerExchange();
- producerExchange.setMutable(false);
- producerExchange.setConnectionContext(context);
- producerExchange.setProducerState(new
ProducerState(new ProducerInfo()));
- context.getBroker().send(producerExchange, message);
- } finally {
- context.setProducerFlowControl(originalFlowControl);
- }
-
- }
- }
+ protected void onMessageWithNoConsumers(ConnectionContext context, Message
msg) {
+ if (!msg.isPersistent() && isSendAdvisoryIfNoConsumers()) {
+ // allow messages with no consumers to be dispatched to a dead
+ // letter queue
+ broker.messageNoConsumers(context, msg);
}
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
index 798f5644cb..72c6f352d2 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
@@ -559,6 +559,15 @@ public class LoggingBrokerPlugin extends
BrokerPluginSupport {
super.messageDiscarded(context, sub, messageReference);
}
+ @Override
+ public void messageNoConsumers(ConnectionContext context, MessageReference
messageReference) {
+ if (isLogAll() || isLogInternalEvents()) {
+ String msg = messageReference.getMessage().toString();
+ LOG.info("Message without consumers: {}", msg);
+ }
+ super.messageNoConsumers(context, messageReference);
+ }
+
@Override
public void slowConsumer(ConnectionContext context, Destination
destination, Subscription subs) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 8d16445fb9..26a5769ed0 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1152,18 +1152,10 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
advisoryBroker = (AdvisoryBroker)
brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
if (advisoryBroker != null) {
- ConnectionContext context = new ConnectionContext();
-
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
- context.setBroker(brokerService.getBroker());
-
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setStringProperty("cause",
error.getLocalizedMessage());
- advisoryBroker.fireAdvisory(context,
AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(),
messageDispatch.getMessage(), null,
- advisoryMessage);
-
+
advisoryBroker.fireFailedForwardAdvisory(messageDispatch.getMessage(), error);
}
} catch (Exception e) {
- LOG.warn("failed to fire forward failure advisory, cause: {}",
e);
+ LOG.warn("failed to fire forward failure advisory, cause: {}",
e.getMessage());
LOG.debug("detail", e);
}
}
diff --git
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
index 6e327bfe53..11de298419 100644
---
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
+++
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
@@ -103,10 +103,16 @@ public class MQTTAuthTestSupport extends MQTTTestSupport {
entry.setAdmin("guests,users,anonymous");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
- entry.setTopic("ActiveMQ.Advisory.>");
+ entry.setTopic("ActiveMQ.Advisory.TempQueue");
entry.setRead("guests,users,anonymous");
- entry.setWrite("guests,users,anonymous");
- entry.setAdmin("guests,users,anonymous");
+ entry.setWrite("admins");
+ entry.setAdmin("admins");
+ authorizationEntries.add(entry);
+ entry = new AuthorizationEntry();
+ entry.setTopic("ActiveMQ.Advisory.TempTopic");
+ entry.setRead("guests,users,anonymous");
+ entry.setWrite("admins");
+ entry.setAdmin("admins");
authorizationEntries.add(entry);
TempDestinationAuthorizationEntry tempEntry = new
TempDestinationAuthorizationEntry();
diff --git
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
index ecdeae5833..e155599d2d 100644
---
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
+++
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
@@ -292,10 +292,16 @@ public class StompTestSupport {
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
- entry.setTopic("ActiveMQ.Advisory.>");
+ entry.setTopic("ActiveMQ.Advisory.TempQueue");
entry.setRead("guests,users");
- entry.setWrite("guests,users");
- entry.setAdmin("guests,users");
+ entry.setWrite("admins");
+ entry.setAdmin("admins");
+ authorizationEntries.add(entry);
+ entry = new AuthorizationEntry();
+ entry.setTopic("ActiveMQ.Advisory.TempTopic");
+ entry.setRead("guests,users");
+ entry.setWrite("admins");
+ entry.setAdmin("admins");
authorizationEntries.add(entry);
TempDestinationAuthorizationEntry tempEntry = new
TempDestinationAuthorizationEntry();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 008b936738..1bfc74cdd9 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -19,12 +19,14 @@ package org.apache.activemq.advisory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
@@ -45,6 +47,8 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -53,6 +57,7 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.test.annotations.ParallelTest;
import org.junit.After;
import org.junit.Before;
@@ -690,7 +695,35 @@ public class AdvisoryTests {
}
protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
+ BrokerService answer = new BrokerService() {
+ // Wrap the broker used by the Advisory broker so we can intercept
the send()
+ // calls and verify any messages published to advisory topics by
the advisory broker
+ // use the right context. The AdvisoryBroker delegates to the
"next" broker in the
+ // chain when sending generated advisories.
+ @Override
+ protected AdvisoryBroker createAdvisoryBroker(Broker broker) {
+ return new AdvisoryBroker(new BrokerFilter(broker) {
+ // track first connection context used for advisories
+ private final AtomicReference<ConnectionContext> first =
new AtomicReference<>();
+
+ @Override
+ public void send(ProducerBrokerExchange producerExchange,
+ org.apache.activemq.command.Message messageSend)
throws Exception {
+ super.send(producerExchange, messageSend);
+ // Verify all advisory topic publishes use the admin
context
+ // This filter is only used by the advisory broker so
all published
+ // advisories should be the broker security context
+ if
(AdvisorySupport.isAdvisoryTopic(messageSend.getDestination())) {
+ first.compareAndSet(null,
producerExchange.getConnectionContext());
+
assertEquals(SecurityContext.BROKER_SECURITY_CONTEXT,
+
producerExchange.getConnectionContext().getSecurityContext());
+ // ConnectionContext is reused for each message
(but producer exchange is not)
+ assertSame(first.get(),
producerExchange.getConnectionContext());
+ }
+ }
+ });
+ }
+ };
configureBroker(answer);
answer.start();
return answer;
@@ -745,6 +778,7 @@ public class AdvisoryTests {
super.preProcessDispatch(messageDispatch);
}
+
};
}
} });
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
index f10682944e..41609d5e30 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
@@ -46,8 +46,8 @@ public class SecureDLQTest extends DeadLetterTestSupport {
writeAccess.put(new ActiveMQQueue("TEST"), USERS);
writeAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS);
- readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
- writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+ readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"),
WILDCARD);
+ readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"),
WILDCARD);
DestinationMap adminAccess = new DefaultAuthorizationMap();
adminAccess.put(new ActiveMQQueue("TEST"), ADMINS);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java
index a50d4e55e7..9811704ced 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java
@@ -271,9 +271,12 @@ public class DestinationAdminAuthzTest {
adminAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP);
adminAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP);
- readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
- writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
- adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+ readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"),
WILDCARD);
+ writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"),
ADMINS);
+ adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"),
ADMINS);
+ readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"),
WILDCARD);
+ writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"),
ADMINS);
+ adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"),
ADMINS);
var authorizationMap = new SimpleAuthorizationMap(writeAccess,
readAccess, adminAccess);
var tempDestinationAuthorizationEntry = new
TempDestinationAuthorizationEntry();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
index d87902add3..97dcd565f7 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
@@ -134,8 +134,8 @@ public class SimpleSecurityBrokerSystemTest extends
SecurityTestSupport {
writeAccess.put(new ActiveMQTopic("GUEST.>"), USERS);
writeAccess.put(new ActiveMQTopic("GUEST.>"), GUESTS);
- readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
- writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+ readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"),
WILDCARD);
+ readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"),
WILDCARD);
DestinationMap adminAccess = new DefaultAuthorizationMap();
adminAccess.put(new ActiveMQTopic(">"), ADMINS);
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml
index 6ad1e72c2f..4c171a05b2 100644
---
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml
@@ -41,7 +41,12 @@
<authorizationEntry queue=">" read="admins" write="admins"
admin="admins"/>
<authorizationEntry topic=">" read="admins" write="admins"
admin="admins"/>
<authorizationEntry queue="GuestQueue" read="admins"
write="admins, guests" admin="admins"/>
- <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests"
write="guests" admin="guests"/>
+ <!-- Grant only admins the ability to create/delete advisory
destinations -->
+ <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins"
write="admins" admin="admins"/>
+
+ <!-- All users need read access to temporary destination
advisories -->
+ <authorizationEntry topic="ActiveMQ.Advisory.TempQueue" read="*"
write="admin" admin="admin"/>
+ <authorizationEntry topic="ActiveMQ.Advisory.TempTopic" read="*"
write="admin" admin="admin"/>
</authorizationEntries>
</authorizationMap>
</map>
@@ -53,4 +58,4 @@
</transportConnectors>
</broker>
-</beans>
\ No newline at end of file
+</beans>
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml
index c8474a349e..c2d10ba5dd 100644
---
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml
@@ -41,7 +41,12 @@
<authorizationEntry queue=">" read="admins" write="admins"
admin="admins"/>
<authorizationEntry topic=">" read="admins" write="admins"
admin="admins"/>
<authorizationEntry queue="GuestQueue" read="admins"
write="admins, guests" admin="admins"/>
- <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests"
write="guests" admin="guests"/>
+ <!-- Grant only admins the ability to create/delete advisory
destinations -->
+ <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins"
write="admins" admin="admins"/>
+
+ <!-- All users need read access to temporary destination
advisories -->
+ <authorizationEntry topic="ActiveMQ.Advisory.TempQueue" read="*"
write="admin" admin="admin"/>
+ <authorizationEntry topic="ActiveMQ.Advisory.TempTopic" read="*"
write="admin" admin="admin"/>
</authorizationEntries>
</authorizationMap>
</map>
@@ -53,4 +58,4 @@
</transportConnectors>
</broker>
-</beans>
\ No newline at end of file
+</beans>
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml
index 246921584f..a30f17aa41 100644
---
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml
@@ -46,13 +46,12 @@
<authorizationEntry topic="USERS.>" read="users" write="users"
admin="users" />
<authorizationEntry topic="GUEST.>" read="guests"
write="guests,users" admin="guests,users" />
- <!-- Grant all users the ability to create/delete advisory
destinations
- but only admins read/write -->
- <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins"
write="admins" admin="guests,users"/>
+ <!-- Grant only admins the ability to create/delete advisory
destinations -->
+ <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins"
write="admins" admin="admins"/>
- <!-- All users need full access to temporary destination
advisories -->
- <authorizationEntry topic="ActiveMQ.Advisory.TempQueue" read="*"
write="*" admin="*"/>
- <authorizationEntry topic="ActiveMQ.Advisory.TempTopic" read="*"
write="*" admin="*"/>
+ <!-- All users need read access to temporary destination
advisories -->
+ <authorizationEntry topic="ActiveMQ.Advisory.TempQueue" read="*"
write="admin" admin="admin"/>
+ <authorizationEntry topic="ActiveMQ.Advisory.TempTopic" read="*"
write="admin" admin="admin"/>
</authorizationEntries>
<!-- let's assign roles to temporary destinations. comment this
entry if we don't want any roles assigned to temp destinations -->
diff --git a/assembly/src/release/conf/activemq.xml
b/assembly/src/release/conf/activemq.xml
index db176d4485..71285294a0 100644
--- a/assembly/src/release/conf/activemq.xml
+++ b/assembly/src/release/conf/activemq.xml
@@ -74,17 +74,24 @@
entirely; specify only packages you explicitly trust).
NOTE ABOUT ADVISORY TOPICS:
- 1. All users need permission to create ActiveMQ.Advisory
destinations,
- which is given by the "admin" acl. However, normal users
should
- generally NOT be given access to read/write for advisories
(except temp)
- as those messages are meant for admin users.
- 2. A notable exception to number 1 is regular users should be
given access to
+ 1. Normal (non-admin) users should generally NOT be given access
to
+ create/delete/read/write for advisories as those messages are
meant for admin users.
+ 2. A notable exception to number 1 is normal users should be
given read access to
advisories for temporary destinations because
ActiveMQConnection uses those advisories.
+ Temp dest advisory topics are: ActiveMQ.Advisory.TempQueue
and ActiveMQ.Advisory.TempTopic
3. In addition, dynamic network connectors use advisories to
determine
consumer demand so the users that will be used to create
bridges need access
- consumer and virtual destination consumer advisories.
+ consumer and virtual destination consumer advisories.
Example, assuming bridge-user
+ is the user that is used for the network connection:
- For more information, see:
+ <authorizationEntries>
+ ......
+ <authorizationEntry topic="ActiveMQ.Advisory.Consumer.>"
read="bridge-user,admin" write="bridge-user,admin" admin="bridge-user,admin"/>
+ <authorizationEntry
topic="ActiveMQ.Advisory.VirtualDestination.Consumer.>"
read="bridge-user,admin" write="bridge-user" admin="bridge-user,admin"/>
+ ......
+ </authorizationEntries>
+
+ For more information, see:
https://activemq.apache.org/security
-->
<!--
@@ -97,9 +104,9 @@
<authorizationEntries>
<authorizationEntry queue=">" read="admins"
write="admins" admin="admins" />
<authorizationEntry topic=">" read="admins"
write="admins" admin="admins" />
- <authorizationEntry topic="ActiveMQ.Advisory.>"
read="admins" write="admins" admin="admins,users" />
- <authorizationEntry
topic="ActiveMQ.Advisory.TempQueue" read="admins,users" write="admins,users"
admin="admins,users"/>
- <authorizationEntry
topic="ActiveMQ.Advisory.TempTopic" read="admins,users" write="admins,users"
admin="admins,users"/>
+ <authorizationEntry topic="ActiveMQ.Advisory.>"
read="admins" write="admins" admin="admins" />
+ <authorizationEntry
topic="ActiveMQ.Advisory.TempQueue" read="users,admins" write="admins"
admin="admins"/>
+ <authorizationEntry
topic="ActiveMQ.Advisory.TempTopic" read="users,admins" write="admins"
admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact