This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6949ad091b7 [improve] [proxy] PIP-395: Add Proxy configuration to
support configurable response headers for http reverse-proxy (#23649)
6949ad091b7 is described below
commit 6949ad091b77a6e06d20b695b8d1673a7d310272
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Feb 13 16:10:14 2025 -0800
[improve] [proxy] PIP-395: Add Proxy configuration to support configurable
response headers for http reverse-proxy (#23649)
---
conf/proxy.conf | 4 ++
.../pulsar/proxy/server/ProxyConfiguration.java | 7 +++
.../org/apache/pulsar/proxy/server/WebServer.java | 51 ++++++++++++++++++++++
.../proxy/server/ProxyAdditionalServletTest.java | 16 ++++++-
4 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 6e6c960e800..567cc0772a3 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -63,6 +63,10 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained
when getting topic statistics data.
haProxyProtocolEnabled=false
+# Default http header map to add into http-proxy for the any security
requirements.
+# eg: {"header1":"value"}
+proxyHttpResponseHeadersJson=
+
# Enable or disable the use of HA proxy protocol for resolving the client IP
for http/https requests.
webServiceHaProxyProtocolEnabled=false
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index b9360e403f6..329e6d52ba6 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -818,6 +818,13 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private Set<String> proxyAdditionalServlets = new TreeSet<>();
+ @FieldContext(
+ category = CATEGORY_PLUGIN,
+ doc = "Default http header map to add into http-proxy for the any
security requirements "
+ + "eg: { \"header1\": \"val1\", \"header2\": \"val2\" }"
+ )
+ private String proxyHttpResponseHeadersJson;
+
@FieldContext(
category = CATEGORY_PLUGIN,
doc = "List of proxy additional servlet to load, which is a list
of proxy additional servlet names"
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 3c472135bdf..7591b8b54db 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.proxy.server;
import static
org.apache.pulsar.proxy.server.AdminProxyHandler.INIT_PARAM_REQUEST_BUFFER_SIZE;
+import com.fasterxml.jackson.core.JsonProcessingException;
import io.opentelemetry.api.OpenTelemetry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.io.IOException;
@@ -27,12 +28,22 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.web.AuthenticationFilter;
@@ -41,6 +52,7 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
@@ -242,6 +254,7 @@ public class WebServer {
ServletContextHandler context = new
ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(basePath);
context.addServlet(servletHolder, MATCH_ALL);
+ context.addFilter(new FilterHolder(new CustomHeaderFilter(config)),
"/*", null);
for (Pair<String, Object> attribute : attributes) {
context.setAttribute(attribute.getLeft(), attribute.getRight());
}
@@ -420,5 +433,43 @@ public class WebServer {
}
}
+ static class CustomHeaderFilter implements Filter {
+
+ Map<String, String> defaultHeaders = new HashMap<>();
+
+ public CustomHeaderFilter(ProxyConfiguration config) {
+ String headerJson = config.getProxyHttpResponseHeadersJson();
+ if (StringUtils.isNotBlank(headerJson)) {
+ try {
+ defaultHeaders =
ObjectMapperFactory.getMapper().getObjectMapper().readerFor(Map.class)
+ .readValue(headerJson);
+ } catch (JsonProcessingException e) {
+ log.warn("Failed to deserialize json headers {}",
headerJson, e);
+ }
+ }
+ }
+
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {
+ }
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
+ throws IOException, ServletException {
+ if (defaultHeaders != null && response instanceof
HttpServletResponse) {
+ HttpServletResponse httpResponse = (HttpServletResponse)
response;
+ defaultHeaders.forEach((header, value) -> {
+ httpResponse.setHeader(header, value);
+ });
+
+ }
+ chain.doFilter(request, response);
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(WebServer.class);
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index e12224da371..cebec274894 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -28,6 +28,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
@@ -68,6 +69,7 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
private WebServer proxyWebServer;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
private Authentication proxyClientAuthentication;
+ private Map<String, String> responseHeaders = new HashMap<>();
@Override
@BeforeClass
@@ -82,6 +84,9 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
// enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.of(2));
proxyConfig.setClusterName(configClusterName);
+ responseHeaders.put("header1", "value1");
+ proxyConfig.setProxyHttpResponseHeadersJson(
+
ObjectMapperFactory.getMapper().writer().writeValueAsString(responseHeaders));
// this is for nar package test
// addServletNar();
@@ -198,11 +203,15 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
int httpPort = proxyWebServer.getListenPortHTTP().get();
log.info("proxy service httpPort {}", httpPort);
String paramValue = "value - " + RandomUtils.nextInt();
- String response = httpGet("http://localhost:" + httpPort + BASE_PATH +
"?" + QUERY_PARAM + "=" + paramValue);
+ final Map<String, String> headers = new HashMap<>();
+ String response = httpGet("http://localhost:" + httpPort + BASE_PATH +
"?" + QUERY_PARAM + "=" + paramValue,
+ headers);
Assert.assertEquals(response, paramValue);
+ String headerKey = "header1";
+ Assert.assertEquals(headers.get(headerKey),
responseHeaders.get(headerKey));
}
- String httpGet(String url) throws IOException {
+ String httpGet(String url, Map<String, String> headers) throws IOException
{
OkHttpClient client = new OkHttpClient();
okhttp3.Request request = new okhttp3.Request.Builder()
.get()
@@ -210,6 +219,9 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
.build();
try (Response response = client.newCall(request).execute()) {
+ response.headers().forEach(pair -> {
+ headers.put(pair.getFirst(), pair.getSecond());
+ });
return response.body().string();
}
}