rhauch commented on a change in pull request #8620: URL: https://github.com/apache/kafka/pull/8620#discussion_r420837649
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); Review comment: ~How about using `strValue.split(',')`, and avoiding the need to import `org.eclipse.jetty.util.StringUtil`?~ Okay, I see now that `cvsSplit` handles quotes. How about a comment: ```suggestion String[] configs = StringUtil.csvSplit(strValue); // handles and removes surrounding quotes ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); + Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config)); + } + + private static void validateHttpResponseHeaderConfig(String config) { + try { + // validate format + String[] configTokens = config.trim().split("\\s+", 2); + if (configTokens.length != 2) { + throw new ConfigException(String.format("Invalid format of header config \"%s\". " + + "Expected: \"[ation] [header name]:[header value]\"", config)); + } + + // validate action + String method = configTokens[0].trim(); + validateHeaderConfigAction(method); + + // validate header name and header value pair + String header = configTokens[1]; + String[] headerTokens = header.trim().split(":"); + if (headerTokens.length != 2) { + throw new ConfigException( + String.format("Invalid format of header name and header value pair \"%s\". " + + "Expected: \"[header name]:[header value]\"", header)); + } + + // validate header name + String headerName = headerTokens[0].trim(); + if (headerName.isEmpty() || headerName.contains(" ")) { + throw new ConfigException(String.format("Invalid header name \"%s\". " + + "The \"[header name]\" cannot contain whitespace", headerName)); Review comment: Nit: ```suggestion throw new ConfigException(String.format("Invalid header name '%s'. " + "The '[header name]' cannot contain whitespace", headerName)); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); + Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config)); + } + + private static void validateHttpResponseHeaderConfig(String config) { + try { + // validate format + String[] configTokens = config.trim().split("\\s+", 2); + if (configTokens.length != 2) { + throw new ConfigException(String.format("Invalid format of header config \"%s\". " + + "Expected: \"[ation] [header name]:[header value]\"", config)); + } + + // validate action + String method = configTokens[0].trim(); + validateHeaderConfigAction(method); + + // validate header name and header value pair + String header = configTokens[1]; + String[] headerTokens = header.trim().split(":"); + if (headerTokens.length != 2) { + throw new ConfigException( + String.format("Invalid format of header name and header value pair \"%s\". " + + "Expected: \"[header name]:[header value]\"", header)); + } + + // validate header name + String headerName = headerTokens[0].trim(); + if (headerName.isEmpty() || headerName.contains(" ")) { + throw new ConfigException(String.format("Invalid header name \"%s\". " + + "The \"[header name]\" cannot contain whitespace", headerName)); + } + } catch (ArrayIndexOutOfBoundsException e) { + throw new ConfigException(String.format("Invalid header config \"%s\".", config), e); Review comment: Nit: ```suggestion throw new ConfigException(String.format("Invalid header config '%s'", config), e); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } Review comment: Should probably add this as the first bit in this method: ```suggestion String strValue = (String) value; if (value == null || strValue.trim().isEmpty()) { return; } ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); + Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config)); + } + + private static void validateHttpResponseHeaderConfig(String config) { + try { + // validate format + String[] configTokens = config.trim().split("\\s+", 2); + if (configTokens.length != 2) { + throw new ConfigException(String.format("Invalid format of header config \"%s\". " + + "Expected: \"[ation] [header name]:[header value]\"", config)); + } + + // validate action + String method = configTokens[0].trim(); + validateHeaderConfigAction(method); + + // validate header name and header value pair + String header = configTokens[1]; + String[] headerTokens = header.trim().split(":"); + if (headerTokens.length != 2) { + throw new ConfigException( + String.format("Invalid format of header name and header value pair \"%s\". " + + "Expected: \"[header name]:[header value]\"", header)); Review comment: Nit: ```suggestion String.format("Invalid format of header name and header value pair '%s'. " + "Expected: '[header name]:[header value]'", header)); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); + Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config)); + } + + private static void validateHttpResponseHeaderConfig(String config) { Review comment: There's no benefit to having the `validateHttpResponseHeaderConfig(...)` method static, so how about making it non-static? And you can use a method reference to make it a tiny bit more readable: ```suggestion Arrays.stream(configs).forEach(this::validateHttpResponseHeaderConfig); } private void validateHttpResponseHeaderConfig(String config) { ``` Or, if you want to make it easier to test, then move to `WorkerConfig` class and make package-level static: ``` Arrays.stream(configs).forEach(WorkerConfig::validateHttpResponseHeaderConfig); ``` I actually think this is the best way to go, because then you can easily add lots of test methods that thoroughly test each of these methods for both positive and negative cases. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); + Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config)); + } + + private static void validateHttpResponseHeaderConfig(String config) { + try { + // validate format + String[] configTokens = config.trim().split("\\s+", 2); + if (configTokens.length != 2) { + throw new ConfigException(String.format("Invalid format of header config \"%s\". " + + "Expected: \"[ation] [header name]:[header value]\"", config)); Review comment: Nits: ```suggestion throw new ConfigException(String.format("Invalid format of header config '%s'. " + "Expected: '[action] [header name]:[header value]'", config)); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); + Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config)); + } + + private static void validateHttpResponseHeaderConfig(String config) { + try { + // validate format + String[] configTokens = config.trim().split("\\s+", 2); + if (configTokens.length != 2) { + throw new ConfigException(String.format("Invalid format of header config \"%s\". " + + "Expected: \"[ation] [header name]:[header value]\"", config)); + } + + // validate action + String method = configTokens[0].trim(); + validateHeaderConfigAction(method); + + // validate header name and header value pair + String header = configTokens[1]; + String[] headerTokens = header.trim().split(":"); + if (headerTokens.length != 2) { + throw new ConfigException( + String.format("Invalid format of header name and header value pair \"%s\". " + + "Expected: \"[header name]:[header value]\"", header)); + } + + // validate header name + String headerName = headerTokens[0].trim(); + if (headerName.isEmpty() || headerName.contains(" ")) { Review comment: Shouldn't this look for other whitespace characters, per the exception message? Something like: ```suggestion if (headerName.isEmpty() || headerName.matches("\\s")) { ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue.isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); + Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config)); + } + + private static void validateHttpResponseHeaderConfig(String config) { + try { + // validate format + String[] configTokens = config.trim().split("\\s+", 2); + if (configTokens.length != 2) { + throw new ConfigException(String.format("Invalid format of header config \"%s\". " + + "Expected: \"[ation] [header name]:[header value]\"", config)); + } + + // validate action + String method = configTokens[0].trim(); + validateHeaderConfigAction(method); + + // validate header name and header value pair + String header = configTokens[1]; + String[] headerTokens = header.trim().split(":"); + if (headerTokens.length != 2) { + throw new ConfigException( + String.format("Invalid format of header name and header value pair \"%s\". " + + "Expected: \"[header name]:[header value]\"", header)); + } + + // validate header name + String headerName = headerTokens[0].trim(); + if (headerName.isEmpty() || headerName.contains(" ")) { + throw new ConfigException(String.format("Invalid header name \"%s\". " + + "The \"[header name]\" cannot contain whitespace", headerName)); + } + } catch (ArrayIndexOutOfBoundsException e) { + throw new ConfigException(String.format("Invalid header config \"%s\".", config), e); + } + } + + private static void validateHeaderConfigAction(String action) { + /** + * The following actions are defined following link. + * {@link https://www.eclipse.org/jetty/documentation/current/header-filter.html} + **/ + if (!Arrays.asList("set", "add", "setDate", "addDate") + .stream() + .anyMatch(action::equalsIgnoreCase)) { + throw new ConfigException(String.format("Invalid header config action: \"%s\". " + + "The action need be one of [\"set\", \"add\", \"setDate\", \"addDate\"]", action)); Review comment: How about defining a static immutable list as a constant: ``` private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList( Arrays.asList("set", "add", "setDate", "addDate") ); ``` so that these lines can become: ```suggestion if (!HEADER_ACTIONS.stream().anyMatch(action::equalsIgnoreCase)) { throw new ConfigException(String.format("Invalid header config action: '%s'. " + "Expected one of %s", action, HEADER_ACTIONS)); ``` This eliminates the duplication of the literal values (which is prone to future errors) and makes the code more readable. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { Review comment: This class should have a `toString()` method that describes what's required, so some string like: > Comma-separated header rules, where each header rule is of the form '[action] [header name]:[header value]' and optionally surrounded by double quotes if any part of a header rule contains a comma ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java ########## @@ -392,6 +395,98 @@ public void testDisableAdminEndpoint() throws IOException { Assert.assertEquals(404, response.getStatusLine().getStatusCode()); } + @Test + public void testValidCustomizedHttpResponseHeaders() throws IOException { + String headerConfig = + "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\""; + Map<String, String> expectedHeaders = new HashMap<>(); + expectedHeaders.put("X-XSS-Protection", "1; mode=block"); + expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate"); + checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders); + } + + @Test + public void testDefaultCustomizedHttpResponseHeaders() throws IOException { + String headerConfig = ""; + Map<String, String> expectedHeaders = new HashMap<>(); + checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders); + } + + @Test(expected = ConfigException.class) + public void testInvalidHeaderConfigFormat() { + String headerConfig = "set add X-XSS-Protection: 1"; + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig); + WorkerConfig workerConfig = new DistributedConfig(workerProps); + } + + @Test(expected = ConfigException.class) + public void testMissedAction() { + String headerConfig = "X-Frame-Options: DENY"; + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig); + WorkerConfig workerConfig = new DistributedConfig(workerProps); + } + + @Test(expected = ConfigException.class) + public void testMissedHeaderName() { + String headerConfig = "add :DENY"; + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig); + WorkerConfig workerConfig = new DistributedConfig(workerProps); + } + + @Test(expected = ConfigException.class) + public void testMissedHeaderValue() { + String headerConfig = "add X-Frame-Options"; + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig); + WorkerConfig workerConfig = new DistributedConfig(workerProps); + } + + @Test(expected = ConfigException.class) + public void testInvalidHeaderConfigAction() { + String headerConfig = "badaction X-XSS-Protection: 1; mode=block"; + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig); + WorkerConfig workerConfig = new DistributedConfig(workerProps); + } + + public void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders) + throws IOException { + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put("offset.storage.file.filename", "/tmp"); + workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig); + WorkerConfig workerConfig = new DistributedConfig(workerProps); + + EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID); + EasyMock.expect(herder.plugins()).andStubReturn(plugins); + EasyMock.expect(plugins.newPlugins(Collections.emptyList(), + workerConfig, + ConnectRestExtension.class)).andStubReturn(Collections.emptyList()); + + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b")); + + PowerMock.replayAll(); + + server = new RestServer(workerConfig); + server.initializeServer(); + server.initializeResources(herder); + HttpRequest request = new HttpGet("/connectors"); + CloseableHttpClient httpClient = HttpClients.createMinimal(); + HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort()); + CloseableHttpResponse response = httpClient.execute(httpHost, request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + if (!headerConfig.isEmpty()) { + expectedHeaders.forEach((k, v) -> + Assert.assertEquals(response.getFirstHeader(k).getValue(), v)); + } else { + Assert.assertNull(response.getFirstHeader("X-Frame-Options")); + } + response.close(); + server.stop(); + } + Review comment: These are great and very useful, but they still don't thoroughly test the `validateHttpResponseHeaderConfig` and `validateHeaderConfigAction` methods. If you make these methods static, package-protected in the `WorkerConfig` class (rather than in a nested class), then you can easily add more tests to `WorkerConfigTest` that thoroughly verify all of the logic in those methods. For example, something like the following in `WorkerConfigTest`: ``` protected static final List<String> VALID_UNQUOTED_HEADER_CONFIGS = Arrays.asList( // TODO: Add a lot more valid header configs "\"add Cache-Control: no-cache, no-store, must-revalidate\"", "add X-XSS-Protection: 1; mode=block", "add Strict-Transport-Security: max-age=31536000; includeSubDomains", "AdD Strict-Transport-Security: max-age=31536000; includeSubDomains", "AdD \t Strict-Transport-Security : \n max-age=31536000; includeSubDomains", "add X-Content-Type-Options: nosniff" ); protected static final List<String> VALID_QUOTE_REQUIRED_HEADER_CONFIGS = Arrays.asList( // TODO: Add a lot more valid header configs "add Cache-Control: no-cache, no-store, must-revalidate" ); protected static final List<String> INVALID_UNQUOTED_HEADER_CONFIGS = Arrays.asList( // TODO: Add a lot more valid header configs "WRONG Cache-Control: no-cache", "add Cache-Control no-cache", "WRONG Cache-Control: no-cache, no-store, must-revalidate", ); protected static final String WHITESPACE = " \t \n "; @Test public void testSingleValidHeaderConfigs() { for (String config : VALID_UNQUOTED_HEADER_CONFIGS) { assertValidHeaderConfig(config); } for (String config : VALID_QUOTE_REQUIRED_HEADER_CONFIGS) { assertValidHeaderConfig("\"" + config + "\""); } } @Test public void testSingleValidHeaderConfigsWithWhitespace() { for (String config : VALID_UNQUOTED_HEADER_CONFIGS) { assertValidHeaderConfig(WHITESPACE + config + WHITESPACE); } for (String config : VALID_QUOTE_REQUIRED_HEADER_CONFIGS) { assertValidHeaderConfig(WHITESPACE + "\"" + WHITESPACE + config + WHITESPACE + "\"" + WHITESPACE); } } @Test public void testMultipleValidHeaderConfigsWithoutWhitespace() { assertValidHeaderConfig(String.join(", ", VALID_UNQUOTED_HEADER_CONFIGS)); assertValidHeaderConfig(String.join(" , ", VALID_UNQUOTED_HEADER_CONFIGS)); } @Test public void testHeaderConfigsThatRequireQuotes() { for (String config : VALID_QUOTE_REQUIRED_HEADER_CONFIGS) { assertInvalidHeaderConfig(config); } } @Test public void testInvalidHeaderConfigs() { for (String config : INVALID_UNQUOTED_HEADER_CONFIGS) { assertInvalidHeaderConfig(config); } } @Test public void testOneInvalidAndMultipleValidHeaderConfigs() { assertInvalidHeaderConfig(String.join(", ", VALID_UNQUOTED_HEADER_CONFIGS) + ", " + INVALID_UNQUOTED_HEADER_CONFIGS.get(0)); assertInvalidHeaderConfig(INVALID_UNQUOTED_HEADER_CONFIGS.get(0) + ", " + String.join(", ", VALID_UNQUOTED_HEADER_CONFIGS)); } protected void assertValidHeaderConfig(String config) { WorkerConfig.validateHttpResponseHeaderConfig(config); // any valid config should be valid per HeaderFilter configureHeaderFilter(config); } protected void assertInvalidHeaderConfig(String config) { assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config)); // any invalid config should be also be invalid per HeaderFilter assertThrows(ConfigException.class, () -> configureHeaderFilter(config)); } protected void configureHeaderFilter(String headerConfig) { FilterHolder headerFilterHolder = new FilterHolder(HeaderFilter.class); headerFilterHolder.setInitParameter("headerConfig", headerConfig); try { try { headerFilterHolder.doStart(); headerFilterHolder.initialize(); } finally { headerFilterHolder.doStop(); } } catch (Exception e) { // wrap in ConfigException to keep the test simple throw new ConfigException("HeaderFilter failure", e); } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org