gemmellr commented on code in PR #5819:
URL: https://github.com/apache/activemq-artemis/pull/5819#discussion_r2182734241
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java:
##########
@@ -156,6 +158,164 @@ public void connectionFailed(ActiveMQException exception,
boolean failedOver, St
}
}
+ @Test
+ @Timeout(20)
+ public void testOutboundConnectsWithOfferedAndDesiredCapabilities() throws
Exception {
+ // Tests that the underlying AMQPConnectionContext will honor the set
offered and desired capabilities
+ // and place them in the outgoing Open performative.
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().withOfferedCapability("ANONYMOUS_RELAY")
+ .withDesiredCapability("SHARED_SUBS")
+ .respond();
+ peer.start();
+
+ final Map<String, Object> config = new LinkedHashMap<>();
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ config.put(TransportConstants.PORT_PROP_NAME,
String.valueOf(peer.getServerURI().getPort()));
+
+ final ClientSASLFactory clientSASLFactory = availableMechanims -> {
+ if (availableMechanims != null &&
Arrays.asList(availableMechanims).contains("ANONYMOUS")) {
+ return new AnonymousSASLMechanism();
+ } else {
+ return null;
+ }
+ };
+
+ final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+ EventHandler eventHandler = new EventHandler() {
+ @Override
+ public void onRemoteOpen(Connection connection) throws Exception {
+ connectionOpened.set(true);
+ }
+ };
+
+ final Symbol[] offeredCapabilities = new Symbol[]
{Symbol.valueOf("ANONYMOUS_RELAY")};
+ final Symbol[] desiredCapabilities = new Symbol[]
{Symbol.valueOf("SHARED_SUBS")};
+
+ AMQPClientConnectionFactory clientFactory = new
AMQPClientConnectionFactory(server, "myid",
Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000,
offeredCapabilities, desiredCapabilities);
+ ProtonClientConnectionManager lifeCycleListener = new
ProtonClientConnectionManager(clientFactory, Optional.of(eventHandler),
clientSASLFactory);
+ ProtonClientProtocolManager protocolManager = new
ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+ NettyConnector connector = new NettyConnector(config,
lifeCycleListener, lifeCycleListener,
server.getExecutorFactory().getExecutor(),
server.getExecutorFactory().getExecutor(), server.getScheduledPool(),
protocolManager);
+
+ try {
+ connector.start();
+
+ assertNotNull(connector.createConnection().getID());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ Wait.assertTrue(connectionOpened::get);
+ } finally {
+ lifeCycleListener.stop();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testOutboundTreatsEmptyOfferedAndDesiredAsNoCapabilities()
throws Exception {
+ // Tests that the underlying AMQPConnectionContext will treat empty
offered and desired capabilities
+ // arrays as being nothing to send and proceeding as normal with old
default behavior of sending nothing.
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().withOfferedCapabilities(nullValue())
+ .withDesiredCapabilities(nullValue())
+ .respond();
+ peer.start();
+
+ final Map<String, Object> config = new LinkedHashMap<>();
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ config.put(TransportConstants.PORT_PROP_NAME,
String.valueOf(peer.getServerURI().getPort()));
+
+ final ClientSASLFactory clientSASLFactory = availableMechanims -> {
+ if (availableMechanims != null &&
Arrays.asList(availableMechanims).contains("ANONYMOUS")) {
+ return new AnonymousSASLMechanism();
+ } else {
+ return null;
+ }
+ };
+
+ final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+ EventHandler eventHandler = new EventHandler() {
+ @Override
+ public void onRemoteOpen(Connection connection) throws Exception {
+ connectionOpened.set(true);
+ }
+ };
+
+ final Symbol[] offeredCapabilities = new Symbol[0];
+ final Symbol[] desiredCapabilities = new Symbol[0];
+
+ AMQPClientConnectionFactory clientFactory = new
AMQPClientConnectionFactory(server, "myid",
Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000,
offeredCapabilities, desiredCapabilities);
+ ProtonClientConnectionManager lifeCycleListener = new
ProtonClientConnectionManager(clientFactory, Optional.of(eventHandler),
clientSASLFactory);
+ ProtonClientProtocolManager protocolManager = new
ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+ NettyConnector connector = new NettyConnector(config,
lifeCycleListener, lifeCycleListener,
server.getExecutorFactory().getExecutor(),
server.getExecutorFactory().getExecutor(), server.getScheduledPool(),
protocolManager);
+
+ try {
+ connector.start();
+
+ assertNotNull(connector.createConnection().getID());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ Wait.assertTrue(connectionOpened::get);
+ } finally {
+ lifeCycleListener.stop();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testOutboundRemainsDefatultedToNoOfferedOrDesiredCapabilities()
throws Exception {
Review Comment:
Typo
```suggestion
public void
testOutboundRemainsDefaultedToNoOfferedOrDesiredCapabilities() throws Exception
{
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact