rhauch commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r421758877



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, 
String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the 
form "
+                + "'[action] [header name]:[header value]' and optionally 
surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }

Review comment:
       I think this method should be on `ResponseHttpHeadersValidator`, not the 
`WorkerConfig`.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -54,17 +55,40 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.ADMIN_LISTENERS_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThrows;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore({"javax.net.ssl.*", "javax.security.*", "javax.crypto.*"})
 public class RestServerTest {
+    protected static final String WHITESPACE = " \t \n \r ";
+    protected static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
+            "add \t Cache-Control: no-cache, no-store, must-revalidate",
+            "add \r X-XSS-Protection: 1; mode=block",
+            "\n add Strict-Transport-Security: max-age=31536000; 
includeSubDomains",
+            "AdD   Strict-Transport-Security:  \r  max-age=31536000;  
includeSubDomains",
+            "AdD \t Strict-Transport-Security : \n   max-age=31536000;  
includeSubDomains",
+            "add X-Content-Type-Options: \r nosniff"

Review comment:
       What about adding valid configs for `set`, `addDate` and `setDate`, too?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, 
String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the 
form "
+                + "'[action] [header name]:[header value]' and optionally 
surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }
+
+    public static void validateHttpResponseHeaderConfig(String config) {

Review comment:
       This could be package-level protected, right? The only place it should 
be called is in `ResponseHttpHeadersValidator` and in tests.
   ```suggestion
       // Visible for testing
       static void validateHttpResponseHeaderConfig(String config) {
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +395,98 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: 
no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, 
must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testDefaultCustomizedHttpResponseHeaders() throws IOException  
{
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "set add X-XSS-Protection: 1";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedAction() {
+        String headerConfig = "X-Frame-Options: DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderName() {
+        String headerConfig = "add :DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderValue() {
+        String headerConfig = "add X-Frame-Options";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigAction() {
+        String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    public void checkCustomizedHttpResponseHeaders(String headerConfig, 
Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                
ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", 
"b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        HttpRequest request = new HttpGet("/connectors");
+        CloseableHttpClient httpClient = HttpClients.createMinimal();
+        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), 
server.advertisedUrl().getPort());
+        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+        if (!headerConfig.isEmpty()) {
+            expectedHeaders.forEach((k, v) ->
+                    Assert.assertEquals(response.getFirstHeader(k).getValue(), 
v));
+        } else {
+            Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+        }
+        response.close();
+        server.stop();
+    }
+

Review comment:
       Looks good. I like the additional checking that you're doing here.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +418,80 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: 
no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, 
must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testDefaultCustomizedHttpResponseHeaders() throws IOException  
{
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testInvalidHeaderConfigs() {
+        for (String config : INVALID_HEADER_CONFIGS) {
+            assertInvalidHeaderConfig(config);
+        }
+    }
+
+    @Test
+    public void testValidHeaderConfigs() {
+        for (String config : VALID_HEADER_CONFIGS) {
+            assertValidHeaderConfig(config);
+        }
+    }
+
+    public void checkCustomizedHttpResponseHeaders(String headerConfig, 
Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, 
headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                
ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", 
"b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        HttpRequest request = new HttpGet("/connectors");
+        CloseableHttpClient httpClient = HttpClients.createMinimal();
+        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), 
server.advertisedUrl().getPort());
+        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+        if (!headerConfig.isEmpty()) {
+            expectedHeaders.forEach((k, v) ->
+                    Assert.assertEquals(response.getFirstHeader(k).getValue(), 
v));
+        } else {
+            Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+        }
+        response.close();
+        server.stop();

Review comment:
       How about putting the `server.stop()` and `server = null` in a finally 
block? Also, `CloseableHttpResponse` is `AutoCloseable`, so we could actually 
use a try-with-resources here:
   ```suggestion
           server = new RestServer(workerConfig);
           try {
             server.initializeServer();
             server.initializeResources(herder);
             HttpRequest request = new HttpGet("/connectors");
             try (CloseableHttpClient httpClient = HttpClients.createMinimal()) 
{
               HttpHost httpHost = new 
HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
               try (CloseableHttpResponse response = 
httpClient.execute(httpHost, request)) {
                 Assert.assertEquals(200, 
response.getStatusLine().getStatusCode());
                 if (!headerConfig.isEmpty()) {
                     expectedHeaders.forEach((k, v) ->
                             
Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
                 } else {
                     
Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
                 }
               }
             }
           } finally {
             server.stop();
             server = null;
           }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, 
String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the 
form "
+                + "'[action] [header name]:[header value]' and optionally 
surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }
+
+    public static void validateHttpResponseHeaderConfig(String config) {
+        try {
+            // validate format
+            String[] configTokens = config.trim().split("\\s+", 2);
+            if (configTokens.length != 2) {
+                throw new ConfigException(String.format("Invalid format of 
header config '%s\'. "
+                        + "Expected: '[ation] [header name]:[header value]'", 
config));
+            }
+
+            // validate action
+            String method = configTokens[0].trim();
+            validateHeaderConfigAction(method);
+
+            // validate header name and header value pair
+            String header = configTokens[1];
+            String[] headerTokens = header.trim().split(":");
+            if (headerTokens.length != 2) {
+                throw new ConfigException(
+                        String.format("Invalid format of header name and 
header value pair '%s'. "
+                                + "Expected: '[header name]:[header value]'", 
header));
+            }
+
+            // validate header name
+            String headerName = headerTokens[0].trim();
+            if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) {
+                throw new ConfigException(String.format("Invalid header name 
'%s'. "
+                        + "The '[header name]' cannot contain whitespace", 
headerName));
+            }
+        } catch (ArrayIndexOutOfBoundsException e) {
+            throw new ConfigException(String.format("Invalid header config 
'%s'.", config), e);
+        }
+    }
+
+    public static void validateHeaderConfigAction(String action) {

Review comment:
       This should be package-level protected:
   ```suggestion
       // Visible for testing
       static void validateHeaderConfigAction(String action) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to