[ 
https://issues.apache.org/jira/browse/ARTEMIS-3915?focusedWorklogId=984825&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-984825
 ]

ASF GitHub Bot logged work on ARTEMIS-3915:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Sep/25 19:55
            Start Date: 26/Sep/25 19:55
    Worklog Time Spent: 10m 
      Work Description: tabish121 commented on code in PR #5908:
URL: https://github.com/apache/activemq-artemis/pull/5908#discussion_r2383365994


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proxyprotocol/HAProxyTest.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proxyprotocol;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueRequestor;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper;
+import 
org.apache.activemq.artemis.core.management.impl.view.predicate.ActiveMQFilterPredicate;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;
+import org.apache.activemq.artemis.json.JsonArray;
+import org.apache.activemq.artemis.json.JsonObject;
+import 
org.apache.activemq.artemis.tests.integration.jms.multiprotocol.MultiprotocolJMSClientTestSupport;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.transport.netty.NettyHAProxyServer;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
+import org.eclipse.paho.mqttv5.client.IMqttToken;
+import org.eclipse.paho.mqttv5.client.MqttCallback;
+import org.eclipse.paho.mqttv5.client.MqttClient;
+import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
+import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
+import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
+import org.eclipse.paho.mqttv5.common.MqttException;
+import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.AT_LEAST_ONCE;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HAProxyTest extends MultiprotocolJMSClientTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final int BROKER_PROXY_PORT = 61617;
+   private static final int BROKER_PROXY_SSL_PORT = 61618;
+   private static final int BROKER_STANDARD_PORT = 61616;
+
+   // the following fake IPs and ports are injected by the NettyHAProxyServer 
to verify functionality on the broker
+   private static final String HEADER_SOURCE_HOST = "9.9.9.9";
+   private static final int HEADER_SOURCE_PORT = 9999;
+   private static final String HEADER_DESTINATION_HOST = "8.8.8.8";
+   private static final int HEADER_DESTINATION_PORT = 8888;
+   private static final String REMOTE_ADDRESS_TO_VERIFY = HEADER_SOURCE_HOST + 
":" + HEADER_SOURCE_PORT;
+   private static final String PROXY_ADDRESS_TO_VERIFY = 
HEADER_DESTINATION_HOST + ":" + HEADER_DESTINATION_PORT;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      server = createServer(false, createDefaultNettyConfig()
+         .clearAcceptorConfigurations()
+         .addAcceptorConfiguration("standard", "tcp://127.0.0.1:" + 
BROKER_STANDARD_PORT + "?protocols=CORE,AMQP")
+         .addAcceptorConfiguration("proxyEnabled", "tcp://127.0.0.1:" + 
BROKER_PROXY_PORT + "?proxyProtocolEnabled=true")
+         .addAcceptorConfiguration("proxyAndSslEnabled", "tcp://127.0.0.1:" + 
BROKER_PROXY_SSL_PORT + 
"?proxyProtocolEnabled=true;sslEnabled=true;protocols=CORE,AMQP,MQTT,OPENWIRE;supportAdvisory=false;suppressInternalManagementObjects=true;keyStorePath=server-keystore.jks;keyStorePassword=securepass"));
+
+      server.start();
+
+      return server;
+   }
+
+   /*
+    * a non-proxied connection shouldn't be able to connect to an acceptor 
using proxyEnabled=true
+    */
+   @Test
+   public void testNonProxiedConnectionToProxyAcceptor() {
+      testFailure(() -> createConnection(BROKER_PROXY_PORT));
+   }
+
+   /*
+    * a proxied connection shouldn't be able to connect to an acceptor using 
proxyEnabled=false
+    */
+   @Test
+   public void testProxiedV1ConnectionToNonProxyAcceptor() throws Exception {
+      testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion.V1);
+   }
+
+   /*
+    * a proxied connection shouldn't be able to connect to an acceptor using 
proxyEnabled=false
+    */
+   @Test
+   public void testProxiedV2ConnectionToNonProxyAcceptor() throws Exception {
+      testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion.V2);
+   }
+
+   public void testProxiedConnectionToNonProxyAcceptor(HAProxyProtocolVersion 
version) throws Exception {
+      NettyHAProxyServer proxy = new NettyHAProxyServer()
+         .setBackEndPort(BROKER_STANDARD_PORT)
+         .setSendProxyHeader(true)
+         .setProxyProtocolVersion(version)
+         .setHeaderSourceHost(HEADER_SOURCE_HOST)
+         .setHeaderSourcePort(HEADER_SOURCE_PORT)
+         .setHeaderDestinationHost(HEADER_DESTINATION_HOST)
+         .setHeaderDestinationPort(HEADER_DESTINATION_PORT)
+         .start();
+      runAfter(proxy::stop);
+      testFailure(() -> createConnection(proxy.getFrontendPortInUse()));
+   }
+
+   private void testFailure(ConnectionSupplier cf) {
+      assertThrows(JMSException.class, () -> {
+         assertTimeout(Duration.ofMillis(2000), () -> testSendReceive(cf, 
null));
+      });
+   }
+
+   @Test
+   public void testSendReceiveCoreV1() throws Exception {
+      testSendReceiveCore(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveCoreV2() throws Exception {
+      testSendReceiveCore(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveCore(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+      testSendReceive(() -> createCoreConnection(proxyPort), version);
+   }
+
+   @Test
+   public void testSendReceiveAMQPV1() throws Exception {
+      testSendReceiveAMQP(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveAMQPV2() throws Exception {
+      testSendReceiveAMQP(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveAMQP(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+      testSendReceive(() -> createConnection(proxyPort), version);
+   }
+
+   @Test
+   public void testSendReceiveOpenWireV1() throws Exception {
+      testSendReceiveOpenWire(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveOpenWireV2() throws Exception {
+      testSendReceiveOpenWire(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveOpenWire(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+      testSendReceive(() -> createOpenWireConnection(proxyPort), version);
+   }
+
+   @Test
+   public void testSendReceiveCoreV1Ssl() throws Exception {
+      testSendReceiveCoreSsl(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveCoreV2Ssl() throws Exception {
+      testSendReceiveCoreSsl(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveCoreSsl(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+      testSendReceive(() -> createCoreConnection("tcp://localhost:" + 
proxyPort + 
"?sslEnabled=true;trustStorePath=server-ca-truststore.jks;trustStorePassword=securepass",
 null, null, null, true), version);
+   }
+
+   @Test
+   public void testSendReceiveAmqpV1Ssl() throws Exception {
+      testSendReceiveAmqpSsl(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveAmqpV2Ssl() throws Exception {
+      testSendReceiveAmqpSsl(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveAmqpSsl(HAProxyProtocolVersion version) throws 
Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+      URL truststorePath = 
Thread.currentThread().getContextClassLoader().getResource("server-ca-truststore.jks");
+      assertNotNull(truststorePath, "Truststore file not found on classpath");
+      String truststore = truststorePath.getPath();
+      URI uri = new URI("amqps://localhost:" + proxyPort + 
"?transport.trustStoreLocation=" + truststore + 
"&transport.trustStorePassword=securepass");
+      testSendReceive(() -> createConnection(uri, null, null, null, true), 
version);
+   }
+
+   @Test
+   public void testSendReceiveOpenWireV1Ssl() throws Exception {
+      testSendReceiveOpenWireSsl(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveOpenWireV2Ssl() throws Exception {
+      testSendReceiveOpenWireSsl(HAProxyProtocolVersion.V1);
+   }
+
+   private void testSendReceiveOpenWireSsl(HAProxyProtocolVersion version) 
throws Exception {
+      int proxyPort = startProxy(version, BROKER_PROXY_SSL_PORT);
+      testSendReceive(() -> {
+         ActiveMQSslConnectionFactory cf = new 
ActiveMQSslConnectionFactory("ssl://localhost:" + proxyPort);
+         try {
+            cf.setTrustStore("server-ca-truststore.jks");
+         } catch (Exception e) {
+            throw new RuntimeException(e);
+         }
+         cf.setKeyStorePassword("securepass");
+         return cf.createConnection();
+      }, version);
+   }
+
+   private void testSendReceive(ConnectionSupplier cf, HAProxyProtocolVersion 
version) throws Exception {
+      int numberOfMessages = 100;
+
+      for (int dest = 0; dest < 5; dest++) {
+         Connection producerConnection = cf.createConnection();
+         Wait.assertTrue(() -> verifyProxyConnectionCount(1, version));
+         Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue("queue.test" + dest);
+         MessageProducer producer = session.createProducer(queue);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            producer.send(session.createTextMessage("hello " + i));
+         }
+
+         Connection consumerConnection = cf.createConnection();
+         Wait.assertTrue(() -> verifyProxyConnectionCount(2, version));
+         Session sessionConsumer = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queueConsumer = sessionConsumer.createQueue("queue.test" + 
dest);
+         MessageConsumer consumer = 
sessionConsumer.createConsumer(queueConsumer);
+         consumerConnection.start();
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            Message message = consumer.receive(5000);
+            assertNotNull(message);
+         }
+
+         producerConnection.close();
+         consumerConnection.close();
+      }
+   }
+
+   @Test
+   public void testSendReceiveMqttV1() throws Exception {
+      testSendReceiveMqtt(HAProxyProtocolVersion.V1);
+   }
+
+   @Test
+   public void testSendReceiveMqttV2() throws Exception {
+      testSendReceiveMqtt(HAProxyProtocolVersion.V2);
+   }
+
+   private void testSendReceiveMqtt(HAProxyProtocolVersion version) throws 
Exception {
+      final int proxyPort = startProxy(version, BROKER_PROXY_PORT);
+      final String url = "tcp://localhost:" + proxyPort;
+      String topic = RandomUtil.randomUUIDString();
+      MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
+
+      CountDownLatch latch = new CountDownLatch(1);
+      MqttClient subscriber = new MqttClient(url, "subscriber", new 
MemoryPersistence());
+      subscriber.connect(connectionOptions);
+      subscriber.setCallback(new MqttCallback() {
+         @Override
+         public void disconnected(MqttDisconnectResponse 
mqttDisconnectResponse) {
+         }
+
+         @Override
+         public void mqttErrorOccurred(MqttException e) {
+         }
+
+         @Override
+         public void deliveryComplete(IMqttToken iMqttToken) {
+         }
+
+         @Override
+         public void connectComplete(boolean b, String s) {
+         }
+
+         @Override
+         public void authPacketArrived(int i, MqttProperties mqttProperties) {
+         }
+
+         @Override
+         public void messageArrived(String topic, MqttMessage message) {
+            logger.info("Message received from topic {}, message={}", topic, 
message);
+            latch.countDown();
+         }
+      });
+      subscriber.subscribe(topic, AT_LEAST_ONCE);
+
+      MqttClient producer = new MqttClient(url, "producer", new 
MemoryPersistence());
+      producer.connect(connectionOptions);
+      producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1, 
false);
+      assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
+      Wait.assertTrue(() -> verifyProxyConnectionCount(2, version));
+      subscriber.disconnect();
+      producer.disconnect();
+   }
+
+   private int startProxy(HAProxyProtocolVersion version, int backEndPort) {
+      NettyHAProxyServer proxy = new NettyHAProxyServer()
+         .setBackEndPort(backEndPort)
+         .setProxyProtocolVersion(version)
+         .setHeaderSourceHost(HEADER_SOURCE_HOST)
+         .setHeaderSourcePort(HEADER_SOURCE_PORT)
+         .setHeaderDestinationHost(HEADER_DESTINATION_HOST)
+         .setHeaderDestinationPort(HEADER_DESTINATION_PORT)
+         .start();
+      runAfter(proxy::stop);
+      return proxy.getFrontendPortInUse();
+   }
+
+   private boolean verifyProxyConnectionCount(int expectedConnections, 
HAProxyProtocolVersion version) throws Exception {
+      // this connection goes directly to the broker so it won't be counted as 
a proxy connection
+      try (ActiveMQQueueConnectionFactory cf = new 
ActiveMQQueueConnectionFactory("tcp://localhost:" + BROKER_STANDARD_PORT);
+           QueueConnection c = cf.createQueueConnection()) {
+         QueueSession s = c.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue managementQueue = 
ActiveMQJMSClient.createQueue("activemq.management");
+         QueueRequestor requestor = new QueueRequestor(s, managementQueue);
+         c.start();
+         Message m = s.createMessage();
+         String filter = createJsonFilter("proxyProtocolVersion", 
ActiveMQFilterPredicate.Operation.EQUALS.toString(), version.toString());
+         JMSManagementHelper.putOperationInvocation(m, ResourceNames.BROKER, 
"listConnections", filter, 1, 50);
+         JsonObject result = JsonUtil.readJsonObject((String) 
JMSManagementHelper.getResult(requestor.request(m), String.class));
+         if (expectedConnections != result.getJsonNumber("count").intValue()) {
+            return false;
+         }
+         JsonArray connections = result.getJsonArray("data");
+         for (int i = 0; i < expectedConnections; i++) {
+            if 
(!connections.getJsonObject(i).getString("proxyAddress").equals(PROXY_ADDRESS_TO_VERIFY))
 {
+               return false;
+            }
+            if 
(!connections.getJsonObject(i).getString("remoteAddress").equals(REMOTE_ADDRESS_TO_VERIFY))
 {
+               return false;
+            }
+         }
+         return true;
+      }
+   }
+
+   @Test
+   @Timeout(30)
+   public void testBrokerHandlesOutOfOrderDeliveryIdInTransfer() throws 
Exception {

Review Comment:
   This test name is not right for what its doing, I should have renamed it 
when I commented about it, maybe  
   
   `testBrokerHandlesSplitAMQPHeaderBytesDuringConnectWithNoProxyHeader` 
   
   I started from an existing test and forgot to change the name 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 984825)
    Time Spent: 12.5h  (was: 12h 20m)

> Support PROXY Protocol
> ----------------------
>
>                 Key: ARTEMIS-3915
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3915
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>          Components: Broker
>            Reporter: João Santos
>            Assignee: Justin Bertram
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> [HAProxy|http://www.haproxy.org/] is a widely known and used TCP Load 
> Balancer and especially useful for an ActiveMQ Artemis clustered environment.
> Although possible to functionally implement with both products current 
> features, Artemis does not support the PROXY protocol, which prevents it's 
> broker nodes from inferring the real remote client IP address when behind an 
> HAProxy instance.
> Since Netty sockets implementation already seems to support this protocol 
> (discussed w/ [~jbertram] on DEV mailing list), it shouldn't be a big leap to 
> adding support for the protocol on Artemis acceptors, thus improving the 
> deployment of the use case at hand.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to