This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ac1b483cb4 ARTEMIS-4801 Fix issue with caching address query results
ac1b483cb4 is described below
commit ac1b483cb449b72b5eaa2b4c6547bc7cdea92dae
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jun 6 16:50:26 2024 -0400
ARTEMIS-4801 Fix issue with caching address query results
When caching address query results the remote session can be blocked forever
from creating links on an address if the "does not exist" value is cached
since it is never updated again and will always report "does not exist" even
if the address is added manually via management later. The cache state can
cause other issues for long running sessions as well and should be removed
to avoid attach failures for cases where the current broker state could
allow
the attach to succeed but the cached entry won't allow it.
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 11 +--
.../integration/amqp/AmqpSendReceiveTest.java | 91 +++++++++++++++++++++-
.../connect/AMQPFederationAddressPolicyTest.java | 75 ++++++++++++++++++
3 files changed, 164 insertions(+), 13 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f6c9167459..4a68102581 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -108,8 +108,6 @@ public class AMQPSessionCallback implements SessionCallback
{
private final CoreMessageObjectPools coreMessageObjectPools = new
CoreMessageObjectPools();
- private final AddressQueryCache<AddressQueryResult> addressQueryCache = new
AddressQueryCache<>();
-
private ProtonTransactionHandler transactionHandler;
private final RunnableList blockedRunnables = new RunnableList();
@@ -409,12 +407,7 @@ public class AMQPSessionCallback implements
SessionCallback {
RoutingType routingType,
boolean autoCreate) throws Exception
{
- AddressQueryResult addressQueryResult =
addressQueryCache.getResult(addressName);
- if (addressQueryResult != null) {
- return addressQueryResult;
- }
-
- addressQueryResult = serverSession.executeAddressQuery(addressName);
+ AddressQueryResult addressQueryResult =
serverSession.executeAddressQuery(addressName);
if (!addressQueryResult.isExists() &&
addressQueryResult.isAutoCreateAddresses() && autoCreate) {
try {
@@ -422,10 +415,10 @@ public class AMQPSessionCallback implements
SessionCallback {
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean
time. Catch and do nothing.
}
+
addressQueryResult = serverSession.executeAddressQuery(addressName);
}
- addressQueryCache.setResult(addressName, addressQueryResult);
return addressQueryResult;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 66594aab54..62e0ef8fcf 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -55,6 +55,8 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.jgroups.util.UUID;
@@ -1256,7 +1258,6 @@ public class AmqpSendReceiveTest extends
AmqpClientTestSupport {
connection.close();
}
-
@Test
@Timeout(60)
public void testReceiveRejecting() throws Exception {
@@ -1277,8 +1278,6 @@ public class AmqpSendReceiveTest extends
AmqpClientTestSupport {
sender.send(message);
}
-
-
Queue queueView = getProxyToQueue(address);
for (int i = 0; i < MSG_COUNT; i++) {
@@ -1296,11 +1295,95 @@ public class AmqpSendReceiveTest extends
AmqpClientTestSupport {
assertNull(receiver.receive(1, TimeUnit.MILLISECONDS));
-
Wait.assertEquals(0, queueView::getDeliveringCount);
connection.close();
}
+ @Test
+ @Timeout(60)
+ public void
testCreateTopicReceiverOnAddressThatDoesNotExistOnPreviousAttempt() throws
Exception {
+ final AmqpClient client = createAmqpClient();
+ final AmqpConnection connection = addConnection(client.connect());
+ final AmqpSession session = connection.createSession();
+ final String address = "test";
+
+ final Source source = new Source();
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setCapabilities(Symbol.getSymbol("topic"));
+ source.setAddress(address);
+
+ try {
+ session.createReceiver(source, "test-receiver-subscription");
+ fail("Should not be able to create the receiver");
+ } catch (Exception ex) {
+ // Expected
+ }
+
+ server.addAddressInfo(new AddressInfo(SimpleString.of(address),
RoutingType.MULTICAST));
+
+ AmqpReceiver receiver = null;
+
+ try {
+ receiver = session.createReceiver(source,
"test-receiver-subscription");
+ receiver.flow(1);
+ } catch (Exception ex) {
+ fail("Should be able to create the receiver");
+ }
+
+ final AmqpSender sender = session.createSender(address);
+ final AmqpMessage message = new AmqpMessage();
+ message.setText("TestPayload");
+
+ sender.send(message);
+
+ assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+
+ connection.close();
+ }
+
+ @Test
+ @Timeout(60)
+ public void
testCreateQueueReceiverOnAddressThenRedoAsTopicReceiverAfterAddressUpdated()
throws Exception {
+ final AmqpClient client = createAmqpClient();
+ final AmqpConnection connection = addConnection(client.connect());
+ final AmqpSession session = connection.createSession();
+ final String address = "test";
+
+ server.addAddressInfo(new AddressInfo(SimpleString.of(address),
RoutingType.ANYCAST));
+
+ final Source source = new Source();
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setCapabilities(Symbol.getSymbol("topic"));
+ source.setAddress(address);
+
+ try {
+ session.createReceiver(source, "test-receiver-subscription");
+ fail("Should not be able to create the receiver");
+ } catch (Exception ex) {
+ // Expected
+ }
+
+ server.removeAddressInfo(SimpleString.of(address), null);
+ server.addAddressInfo(new AddressInfo(SimpleString.of(address),
RoutingType.MULTICAST));
+ AmqpReceiver receiver = null;
+
+ try {
+ receiver = session.createReceiver(source,
"test-receiver-subscription");
+ receiver.flow(1);
+ } catch (Exception ex) {
+ fail("Should be able to create the receiver");
+ }
+
+ final AmqpSender sender = session.createSender(address);
+ final AmqpMessage message = new AmqpMessage();
+ message.setText("TestPayload");
+
+ sender.send(message);
+
+ assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+
+ connection.close();
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index e36c8c8ac6..c6d7da18af 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -3575,6 +3575,81 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testBrokerAllowsAttachToPreviouslyNonExistentAddressAfterItIsAdded() throws
Exception {
+ final AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setAutoCreateAddresses(false);
+
+ server.getConfiguration().getAddressSettings().put("#", addressSettings);
+ server.start();
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withNullSource();
+ peer.expectDetach().respond();
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ // and expect a rejected attach as the address does not yet exist and
auto create
+ // has been disabled.
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ server.addAddressInfo(new
AddressInfo("test").addRoutingType(RoutingType.MULTICAST));
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Attempt attach again as if new address demand has been added and
the policy manager reacts.
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
private static void sendAddressAddedEvent(ProtonTestPeer peer, String
address, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact