This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6949ad091b7 [improve] [proxy] PIP-395: Add Proxy configuration to 
support configurable response headers for http reverse-proxy (#23649)
6949ad091b7 is described below

commit 6949ad091b77a6e06d20b695b8d1673a7d310272
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Feb 13 16:10:14 2025 -0800

    [improve] [proxy] PIP-395: Add Proxy configuration to support configurable 
response headers for http reverse-proxy (#23649)
---
 conf/proxy.conf                                    |  4 ++
 .../pulsar/proxy/server/ProxyConfiguration.java    |  7 +++
 .../org/apache/pulsar/proxy/server/WebServer.java  | 51 ++++++++++++++++++++++
 .../proxy/server/ProxyAdditionalServletTest.java   | 16 ++++++-
 4 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 6e6c960e800..567cc0772a3 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -63,6 +63,10 @@ advertisedAddress=
 # If true, the real IP addresses of consumers and producers can be obtained 
when getting topic statistics data.
 haProxyProtocolEnabled=false
 
+# Default http header map to add into http-proxy for the any security 
requirements.
+# eg: {"header1":"value"}
+proxyHttpResponseHeadersJson=
+
 # Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
 webServiceHaProxyProtocolEnabled=false
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index b9360e403f6..329e6d52ba6 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -818,6 +818,13 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private Set<String> proxyAdditionalServlets = new TreeSet<>();
 
+    @FieldContext(
+            category = CATEGORY_PLUGIN,
+            doc = "Default http header map to add into http-proxy for the any 
security requirements "
+                    + "eg: { \"header1\": \"val1\", \"header2\": \"val2\" }"
+    )
+    private String proxyHttpResponseHeadersJson;
+
     @FieldContext(
             category = CATEGORY_PLUGIN,
             doc = "List of proxy additional servlet to load, which is a list 
of proxy additional servlet names"
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 3c472135bdf..7591b8b54db 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static 
org.apache.pulsar.proxy.server.AdminProxyHandler.INIT_PARAM_REQUEST_BUFFER_SIZE;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import io.opentelemetry.api.OpenTelemetry;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.io.IOException;
@@ -27,12 +28,22 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
@@ -41,6 +52,7 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
 import org.apache.pulsar.broker.web.RateLimitingFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.PulsarSslConfiguration;
 import org.apache.pulsar.common.util.PulsarSslFactory;
 import org.apache.pulsar.jetty.tls.JettySslContextFactory;
@@ -242,6 +254,7 @@ public class WebServer {
         ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
         context.setContextPath(basePath);
         context.addServlet(servletHolder, MATCH_ALL);
+        context.addFilter(new FilterHolder(new CustomHeaderFilter(config)), 
"/*", null);
         for (Pair<String, Object> attribute : attributes) {
             context.setAttribute(attribute.getLeft(), attribute.getRight());
         }
@@ -420,5 +433,43 @@ public class WebServer {
         }
     }
 
+    static class CustomHeaderFilter implements Filter {
+
+        Map<String, String> defaultHeaders = new HashMap<>();
+
+        public CustomHeaderFilter(ProxyConfiguration config) {
+            String headerJson = config.getProxyHttpResponseHeadersJson();
+            if (StringUtils.isNotBlank(headerJson)) {
+                try {
+                    defaultHeaders = 
ObjectMapperFactory.getMapper().getObjectMapper().readerFor(Map.class)
+                            .readValue(headerJson);
+                } catch (JsonProcessingException e) {
+                    log.warn("Failed to deserialize json headers {}", 
headerJson, e);
+                }
+            }
+        }
+
+        @Override
+        public void init(FilterConfig filterConfig) throws ServletException {
+        }
+
+        @Override
+        public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain)
+                throws IOException, ServletException {
+            if (defaultHeaders != null && response instanceof 
HttpServletResponse) {
+                HttpServletResponse httpResponse = (HttpServletResponse) 
response;
+                defaultHeaders.forEach((header, value) -> {
+                    httpResponse.setHeader(header, value);
+                });
+
+            }
+            chain.doFilter(request, response);
+        }
+
+        @Override
+        public void destroy() {
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(WebServer.class);
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index e12224da371..cebec274894 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -28,6 +28,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import 
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
@@ -68,6 +69,7 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
     private WebServer proxyWebServer;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
     private Authentication proxyClientAuthentication;
+    private Map<String, String> responseHeaders = new HashMap<>();
 
     @Override
     @BeforeClass
@@ -82,6 +84,9 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         // enable full parsing feature
         proxyConfig.setProxyLogLevel(Optional.of(2));
         proxyConfig.setClusterName(configClusterName);
+        responseHeaders.put("header1", "value1");
+        proxyConfig.setProxyHttpResponseHeadersJson(
+                
ObjectMapperFactory.getMapper().writer().writeValueAsString(responseHeaders));
 
         // this is for nar package test
 //        addServletNar();
@@ -198,11 +203,15 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         int httpPort = proxyWebServer.getListenPortHTTP().get();
         log.info("proxy service httpPort {}", httpPort);
         String paramValue = "value - " + RandomUtils.nextInt();
-        String response = httpGet("http://localhost:"; + httpPort + BASE_PATH + 
"?" + QUERY_PARAM + "=" + paramValue);
+        final Map<String, String> headers = new HashMap<>();
+        String response = httpGet("http://localhost:"; + httpPort + BASE_PATH + 
"?" + QUERY_PARAM + "=" + paramValue,
+                headers);
         Assert.assertEquals(response, paramValue);
+        String headerKey = "header1";
+        Assert.assertEquals(headers.get(headerKey), 
responseHeaders.get(headerKey));
     }
 
-    String httpGet(String url) throws IOException {
+    String httpGet(String url, Map<String, String> headers) throws IOException 
{
         OkHttpClient client = new OkHttpClient();
         okhttp3.Request request = new okhttp3.Request.Builder()
                 .get()
@@ -210,6 +219,9 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
                 .build();
 
         try (Response response = client.newCall(request).execute()) {
+            response.headers().forEach(pair -> {
+                headers.put(pair.getFirst(), pair.getSecond());
+            });
             return response.body().string();
         }
     }

Reply via email to