[
https://issues.apache.org/jira/browse/ARTEMIS-3915?focusedWorklogId=984826&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-984826
]
ASF GitHub Bot logged work on ARTEMIS-3915:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 26/Sep/25 20:16
Start Date: 26/Sep/25 20:16
Worklog Time Spent: 10m
Work Description: jbertram commented on code in PR #5908:
URL: https://github.com/apache/activemq-artemis/pull/5908#discussion_r2383405366
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proxyprotocol/HAProxyTest.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proxyprotocol;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueRequestor;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper;
+import
org.apache.activemq.artemis.core.management.impl.view.predicate.ActiveMQFilterPredicate;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;
+import org.apache.activemq.artemis.json.JsonArray;
+import org.apache.activemq.artemis.json.JsonObject;
+import
org.apache.activemq.artemis.tests.integration.jms.multiprotocol.MultiprotocolJMSClientTestSupport;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.transport.netty.NettyHAProxyServer;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
+import org.eclipse.paho.mqttv5.client.IMqttToken;
+import org.eclipse.paho.mqttv5.client.MqttCallback;
+import org.eclipse.paho.mqttv5.client.MqttClient;
+import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
+import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
+import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
+import org.eclipse.paho.mqttv5.common.MqttException;
+import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.AT_LEAST_ONCE;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HAProxyTest extends MultiprotocolJMSClientTestSupport {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int BROKER_PROXY_PORT = 61617;
+ private static final int BROKER_PROXY_SSL_PORT = 61618;
+ private static final int BROKER_STANDARD_PORT = 61616;
+
+ // the following fake IPs and ports are injected by the NettyHAProxyServer
to verify functionality on the broker
+ private static final String HEADER_SOURCE_HOST = "9.9.9.9";
+ private static final int HEADER_SOURCE_PORT = 9999;
+ private static final String HEADER_DESTINATION_HOST = "8.8.8.8";
+ private static final int HEADER_DESTINATION_PORT = 8888;
+ private static final String REMOTE_ADDRESS_TO_VERIFY = HEADER_SOURCE_HOST +
":" + HEADER_SOURCE_PORT;
+ private static final String PROXY_ADDRESS_TO_VERIFY =
HEADER_DESTINATION_HOST + ":" + HEADER_DESTINATION_PORT;
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ server = createServer(false, createDefaultNettyConfig()
+ .clearAcceptorConfigurations()
+ .addAcceptorConfiguration("standard", "tcp://127.0.0.1:" +
BROKER_STANDARD_PORT + "?protocols=CORE,AMQP")
+ .addAcceptorConfiguration("proxyEnabled", "tcp://127.0.0.1:" +
BROKER_PROXY_PORT + "?proxyProtocolEnabled=true")
+ .addAcceptorConfiguration("proxyAndSslEnabled", "tcp://127.0.0.1:" +
BROKER_PROXY_SSL_PORT +
"?proxyProtocolEnabled=true;sslEnabled=true;protocols=CORE,AMQP,MQTT,OPENWIRE;supportAdvisory=false;suppressInternalManagementObjects=true;keyStorePath=server-keystore.jks;keyStorePassword=securepass"));
+
+ server.start();
+
+ return server;
+ }
+
+ /*
+ * a non-proxied connection shouldn't be able to connect to an acceptor
using proxyEnabled=true
+ */
+ @Test
+ public void testNonProxiedConnectionToProxyAcceptor() {
+ testFailure(() -> createConnection(BROKER_PROXY_PORT));
+ }
+
+ /*
+ * a proxied connection shouldn't be able to connect to an acceptor using
proxyEnabled=false
+ */
+ @Test
+ public void testProxiedV1ConnectionToNonProxyAcceptor() throws Exception {
+ testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion.V1);
+ }
+
+ /*
+ * a proxied connection shouldn't be able to connect to an acceptor using
proxyEnabled=false
+ */
+ @Test
+ public void testProxiedV2ConnectionToNonProxyAcceptor() throws Exception {
+ testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion.V2);
+ }
+
+ public void testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion
version) throws Exception {
+ NettyHAProxyServer proxy = new NettyHAProxyServer()
+ .setBackEndPort(BROKER_STANDARD_PORT)
+ .setSendProxyHeader(true)
+ .setProxyProtocolVersion(version)
+ .setHeaderSourceHost(HEADER_SOURCE_HOST)
+ .setHeaderSourcePort(HEADER_SOURCE_PORT)
+ .setHeaderDestinationHost(HEADER_DESTINATION_HOST)
+ .setHeaderDestinationPort(HEADER_DESTINATION_PORT)
+ .start();
+ runAfter(proxy::stop);
+ testFailure(() -> createConnection(proxy.getFrontendPortInUse()));
+ }
+
+ private void testFailure(ConnectionSupplier cf) {
+ assertThrows(JMSException.class, () -> {
+ assertTimeout(Duration.ofMillis(2000), () -> testSendReceive(cf,
null));
+ });
+ }
+
+ @Test
+ public void testSendReceiveCoreV1() throws Exception {
+ testSendReceiveCore(HAProxyProtocolVersion.V1);
+ }
+
+ @Test
+ public void testSendReceiveCoreV2() throws Exception {
+ testSendReceiveCore(HAProxyProtocolVersion.V2);
+ }
+
+ private void testSendReceiveCore(HAProxyProtocolVersion version) throws
Exception {
+ int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+ testSendReceive(() -> createCoreConnection(proxyPort), version);
+ }
+
+ @Test
+ public void testSendReceiveAMQPV1() throws Exception {
+ testSendReceiveAMQP(HAProxyProtocolVersion.V1);
+ }
+
+ @Test
+ public void testSendReceiveAMQPV2() throws Exception {
+ testSendReceiveAMQP(HAProxyProtocolVersion.V2);
+ }
+
+ private void testSendReceiveAMQP(HAProxyProtocolVersion version) throws
Exception {
+ int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+ testSendReceive(() -> createConnection(proxyPort), version);
+ }
+
+ @Test
+ public void testSendReceiveOpenWireV1() throws Exception {
+ testSendReceiveOpenWire(HAProxyProtocolVersion.V1);
+ }
+
+ @Test
+ public void testSendReceiveOpenWireV2() throws Exception {
+ testSendReceiveOpenWire(HAProxyProtocolVersion.V2);
+ }
+
+ private void testSendReceiveOpenWire(HAProxyProtocolVersion version) throws
Exception {
+ int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+ testSendReceive(() -> createOpenWireConnection(proxyPort), version);
+ }
+
+ @Test
+ public void testSendReceiveCoreV1Ssl() throws Exception {
+ testSendReceiveCoreSsl(HAProxyProtocolVersion.V1);
+ }
+
+ @Test
+ public void testSendReceiveCoreV2Ssl() throws Exception {
+ testSendReceiveCoreSsl(HAProxyProtocolVersion.V2);
+ }
+
+ private void testSendReceiveCoreSsl(HAProxyProtocolVersion version) throws
Exception {
+ int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+ testSendReceive(() -> createCoreConnection("tcp://localhost:" +
proxyPort +
"?sslEnabled=true;trustStorePath=server-ca-truststore.jks;trustStorePassword=securepass",
null, null, null, true), version);
+ }
+
+ @Test
+ public void testSendReceiveAmqpV1Ssl() throws Exception {
+ testSendReceiveAmqpSsl(HAProxyProtocolVersion.V1);
+ }
+
+ @Test
+ public void testSendReceiveAmqpV2Ssl() throws Exception {
+ testSendReceiveAmqpSsl(HAProxyProtocolVersion.V2);
+ }
+
+ private void testSendReceiveAmqpSsl(HAProxyProtocolVersion version) throws
Exception {
+ int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+ URL truststorePath =
Thread.currentThread().getContextClassLoader().getResource("server-ca-truststore.jks");
+ assertNotNull(truststorePath, "Truststore file not found on classpath");
+ String truststore = truststorePath.getPath();
+ URI uri = new URI("amqps://localhost:" + proxyPort +
"?transport.trustStoreLocation=" + truststore +
"&transport.trustStorePassword=securepass");
+ testSendReceive(() -> createConnection(uri, null, null, null, true),
version);
+ }
+
+ @Test
+ public void testSendReceiveOpenWireV1Ssl() throws Exception {
+ testSendReceiveOpenWireSsl(HAProxyProtocolVersion.V1);
+ }
+
+ @Test
+ public void testSendReceiveOpenWireV2Ssl() throws Exception {
+ testSendReceiveOpenWireSsl(HAProxyProtocolVersion.V1);
+ }
+
+ private void testSendReceiveOpenWireSsl(HAProxyProtocolVersion version)
throws Exception {
+ int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+ testSendReceive(() -> {
+ ActiveMQSslConnectionFactory cf = new
ActiveMQSslConnectionFactory("ssl://localhost:" + proxyPort);
+ try {
+ cf.setTrustStore("server-ca-truststore.jks");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ cf.setKeyStorePassword("securepass");
+ return cf.createConnection();
+ }, version);
+ }
+
+ private void testSendReceive(ConnectionSupplier cf, HAProxyProtocolVersion
version) throws Exception {
+ int numberOfMessages = 100;
+
+ for (int dest = 0; dest < 5; dest++) {
+ Connection producerConnection = cf.createConnection();
+ Wait.assertTrue(() -> verifyProxyConnectionCount(1, version));
+ Session session = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("queue.test" + dest);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ producer.send(session.createTextMessage("hello " + i));
+ }
+
+ Connection consumerConnection = cf.createConnection();
+ Wait.assertTrue(() -> verifyProxyConnectionCount(2, version));
+ Session sessionConsumer = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queueConsumer = sessionConsumer.createQueue("queue.test" +
dest);
+ MessageConsumer consumer =
sessionConsumer.createConsumer(queueConsumer);
+ consumerConnection.start();
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ Message message = consumer.receive(5000);
+ assertNotNull(message);
+ }
+
+ producerConnection.close();
+ consumerConnection.close();
+ }
+ }
+
+ @Test
+ public void testSendReceiveMqttV1() throws Exception {
+ testSendReceiveMqtt(HAProxyProtocolVersion.V1);
+ }
+
+ @Test
+ public void testSendReceiveMqttV2() throws Exception {
+ testSendReceiveMqtt(HAProxyProtocolVersion.V2);
+ }
+
+ private void testSendReceiveMqtt(HAProxyProtocolVersion version) throws
Exception {
+ final int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+ final String url = "tcp://localhost:" + proxyPort;
+ String topic = RandomUtil.randomUUIDString();
+ MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ MqttClient subscriber = new MqttClient(url, "subscriber", new
MemoryPersistence());
+ subscriber.connect(connectionOptions);
+ subscriber.setCallback(new MqttCallback() {
+ @Override
+ public void disconnected(MqttDisconnectResponse
mqttDisconnectResponse) {
+ }
+
+ @Override
+ public void mqttErrorOccurred(MqttException e) {
+ }
+
+ @Override
+ public void deliveryComplete(IMqttToken iMqttToken) {
+ }
+
+ @Override
+ public void connectComplete(boolean b, String s) {
+ }
+
+ @Override
+ public void authPacketArrived(int i, MqttProperties mqttProperties) {
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ logger.info("Message received from topic {}, message={}", topic,
message);
+ latch.countDown();
+ }
+ });
+ subscriber.subscribe(topic, AT_LEAST_ONCE);
+
+ MqttClient producer = new MqttClient(url, "producer", new
MemoryPersistence());
+ producer.connect(connectionOptions);
+ producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1,
false);
+ assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
+ Wait.assertTrue(() -> verifyProxyConnectionCount(2, version));
+ subscriber.disconnect();
+ producer.disconnect();
+ }
+
+ private int startProxy(HAProxyProtocolVersion version, int backEndPort) {
+ NettyHAProxyServer proxy = new NettyHAProxyServer()
+ .setBackEndPort(backEndPort)
+ .setProxyProtocolVersion(version)
+ .setHeaderSourceHost(HEADER_SOURCE_HOST)
+ .setHeaderSourcePort(HEADER_SOURCE_PORT)
+ .setHeaderDestinationHost(HEADER_DESTINATION_HOST)
+ .setHeaderDestinationPort(HEADER_DESTINATION_PORT)
+ .start();
+ runAfter(proxy::stop);
+ return proxy.getFrontendPortInUse();
+ }
+
+ private boolean verifyProxyConnectionCount(int expectedConnections,
HAProxyProtocolVersion version) throws Exception {
+ // this connection goes directly to the broker so it won't be counted as
a proxy connection
+ try (ActiveMQQueueConnectionFactory cf = new
ActiveMQQueueConnectionFactory("tcp://localhost:" + BROKER_STANDARD_PORT);
+ QueueConnection c = cf.createQueueConnection()) {
+ QueueSession s = c.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue managementQueue =
ActiveMQJMSClient.createQueue("activemq.management");
+ QueueRequestor requestor = new QueueRequestor(s, managementQueue);
+ c.start();
+ Message m = s.createMessage();
+ String filter = createJsonFilter("proxyProtocolVersion",
ActiveMQFilterPredicate.Operation.EQUALS.toString(), version.toString());
+ JMSManagementHelper.putOperationInvocation(m, ResourceNames.BROKER,
"listConnections", filter, 1, 50);
+ JsonObject result = JsonUtil.readJsonObject((String)
JMSManagementHelper.getResult(requestor.request(m), String.class));
+ if (expectedConnections != result.getJsonNumber("count").intValue()) {
+ return false;
+ }
+ JsonArray connections = result.getJsonArray("data");
+ for (int i = 0; i < expectedConnections; i++) {
+ if
(!connections.getJsonObject(i).getString("proxyAddress").equals(PROXY_ADDRESS_TO_VERIFY))
{
+ return false;
+ }
+ if
(!connections.getJsonObject(i).getString("remoteAddress").equals(REMOTE_ADDRESS_TO_VERIFY))
{
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ @Test
+ @Timeout(30)
+ public void testBrokerHandlesOutOfOrderDeliveryIdInTransfer() throws
Exception {
Review Comment:
I didn't even look at the name. :laughing:
Fixed now with your suggestion.
Issue Time Tracking
-------------------
Worklog Id: (was: 984826)
Time Spent: 12h 40m (was: 12.5h)
> Support PROXY Protocol
> ----------------------
>
> Key: ARTEMIS-3915
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3915
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: Broker
> Reporter: João Santos
> Assignee: Justin Bertram
> Priority: Major
> Labels: pull-request-available
> Time Spent: 12h 40m
> Remaining Estimate: 0h
>
> [HAProxy|http://www.haproxy.org/] is a widely known and used TCP Load
> Balancer and especially useful for an ActiveMQ Artemis clustered environment.
> Although possible to functionally implement with both products current
> features, Artemis does not support the PROXY protocol, which prevents it's
> broker nodes from inferring the real remote client IP address when behind an
> HAProxy instance.
> Since Netty sockets implementation already seems to support this protocol
> (discussed w/ [~jbertram] on DEV mailing list), it shouldn't be a big leap to
> adding support for the protocol on Artemis acceptors, thus improving the
> deployment of the use case at hand.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact