lucasbru commented on code in PR #21724:
URL: https://github.com/apache/kafka/pull/21724#discussion_r2930512225


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1664,29 +1664,60 @@ private void validateRackAwarenessConfiguration() {
         final Map<String, String> clientTags = getClientTags();
 
         if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
-            throw new ConfigException("At most " + 
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
-                                      "can be specified using " + 
CLIENT_TAG_PREFIX + " prefix.");
+            throw new ConfigException(
+                String.format(
+                    "At most %s client tags can be specified using %s prefix.",
+                    MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+                    CLIENT_TAG_PREFIX
+                )
+            );
         }
 
         for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+            if (rackAwareAssignmentTag.isEmpty()) {
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    "Contains invalid value []. Tag key cannot be empty."
+                );
+            }
             if (!clientTags.containsKey(rackAwareAssignmentTag)) {
-                throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
-                                          rackAwareAssignmentTags,
-                                          "Contains invalid value [" + 
rackAwareAssignmentTag + "] " +
-                                          "which doesn't have corresponding 
tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    String.format(
+                        "Contains invalid value [%s] which doesn't have 
corresponding tag set via [%s] prefix.",
+                        rackAwareAssignmentTag,
+                        CLIENT_TAG_PREFIX
+                    )
+                );
             }
         }
 
         clientTags.forEach((tagKey, tagValue) -> {
+            if (tagKey.trim().isEmpty()) {
+                throw new ConfigException("Invalid config `client.tag.` 
(missing client tag key).");
+            }
+            if (tagValue.trim().isEmpty()) {
+                throw new ConfigException(
+                    CLIENT_TAG_PREFIX + tagKey,
+                    "[]",

Review Comment:
   The second argument to `ConfigException` is supposed to be the actual 
configured value. Passing a hardcoded `"[]"` is misleading — the user set `" "` 
(whitespace), not `[]`. Consider passing `tagValue` instead so the error 
message reflects what was actually configured.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1664,29 +1664,60 @@ private void validateRackAwarenessConfiguration() {
         final Map<String, String> clientTags = getClientTags();
 
         if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
-            throw new ConfigException("At most " + 
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
-                                      "can be specified using " + 
CLIENT_TAG_PREFIX + " prefix.");
+            throw new ConfigException(
+                String.format(
+                    "At most %s client tags can be specified using %s prefix.",
+                    MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+                    CLIENT_TAG_PREFIX
+                )
+            );
         }
 
         for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+            if (rackAwareAssignmentTag.isEmpty()) {
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    "Contains invalid value []. Tag key cannot be empty."
+                );
+            }
             if (!clientTags.containsKey(rackAwareAssignmentTag)) {
-                throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
-                                          rackAwareAssignmentTags,
-                                          "Contains invalid value [" + 
rackAwareAssignmentTag + "] " +
-                                          "which doesn't have corresponding 
tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    String.format(
+                        "Contains invalid value [%s] which doesn't have 
corresponding tag set via [%s] prefix.",
+                        rackAwareAssignmentTag,
+                        CLIENT_TAG_PREFIX
+                    )
+                );
             }
         }
 
         clientTags.forEach((tagKey, tagValue) -> {
+            if (tagKey.trim().isEmpty()) {
+                throw new ConfigException("Invalid config `client.tag.` 
(missing client tag key).");

Review Comment:
   This uses the single-argument `ConfigException` constructor with a fully 
custom message, while all other errors in this method use the three-argument 
constructor (`name, value, message`). For consistency, consider using the 
three-argument form here too, e.g. `new ConfigException(CLIENT_TAG_PREFIX, 
tagKey, "Tag key cannot be empty.")`.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1664,29 +1664,60 @@ private void validateRackAwarenessConfiguration() {
         final Map<String, String> clientTags = getClientTags();
 
         if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
-            throw new ConfigException("At most " + 
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
-                                      "can be specified using " + 
CLIENT_TAG_PREFIX + " prefix.");
+            throw new ConfigException(
+                String.format(
+                    "At most %s client tags can be specified using %s prefix.",
+                    MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+                    CLIENT_TAG_PREFIX
+                )
+            );
         }
 
         for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+            if (rackAwareAssignmentTag.isEmpty()) {

Review Comment:
   This checks `isEmpty()` without `trim()`, while below at line 1698 you check 
`tagKey.trim().isEmpty()`. This is technically correct because 
`COMMA_WITH_WHITESPACE` already strips whitespace during list parsing, but the 
inconsistency could confuse future readers. Maybe worth a brief comment 
explaining why `trim()` isn't needed here.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1182,7 +1182,34 @@ public void shouldGetClientTagsMapWhenSet() {
     @Test
     public void 
shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithUnknownTags() {
         props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "cluster");
-        assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertEquals(
+            "Invalid value [cluster] for configuration 
rack.aware.assignment.tags: Contains invalid value [cluster] which doesn't have 
corresponding tag set via [client.tag.] prefix.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldAllowWhitespacesInRackAwareAssignmentTagsList() {
+        props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, " zone , 
cluster ");
+        props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
+        props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1");
+        final StreamsConfig config = new StreamsConfig(props);
+        final Map<String, String> clientTags = config.getClientTags();
+        assertEquals(2, clientTags.size());
+        assertEquals("eu-central-1a", clientTags.get("zone"));
+        assertEquals("cluster-1", clientTags.get("cluster"));

Review Comment:
   This test validates existing behavior (the `COMMA_WITH_WHITESPACE` pattern 
in `ConfigDef` already handles whitespace around commas) rather than new code 
from this PR. Fine to keep as a regression test, but maybe rename to make clear 
it's documenting existing behavior rather than testing new functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to