lucasbru commented on code in PR #21724:
URL: https://github.com/apache/kafka/pull/21724#discussion_r2939082225


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1664,29 +1664,60 @@ private void validateRackAwarenessConfiguration() {
         final Map<String, String> clientTags = getClientTags();
 
         if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
-            throw new ConfigException("At most " + 
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
-                                      "can be specified using " + 
CLIENT_TAG_PREFIX + " prefix.");
+            throw new ConfigException(
+                String.format(
+                    "At most %s client tags can be specified using %s prefix.",
+                    MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+                    CLIENT_TAG_PREFIX
+                )
+            );
         }
 
         for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+            if (rackAwareAssignmentTag.isEmpty()) {
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    "Contains invalid value []. Tag key cannot be empty."
+                );
+            }
             if (!clientTags.containsKey(rackAwareAssignmentTag)) {
-                throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
-                                          rackAwareAssignmentTags,
-                                          "Contains invalid value [" + 
rackAwareAssignmentTag + "] " +
-                                          "which doesn't have corresponding 
tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    String.format(
+                        "Contains invalid value [%s] which doesn't have 
corresponding tag set via [%s] prefix.",
+                        rackAwareAssignmentTag,
+                        CLIENT_TAG_PREFIX
+                    )
+                );
             }
         }
 
         clientTags.forEach((tagKey, tagValue) -> {
+            if (tagKey.trim().isEmpty()) {
+                throw new ConfigException("Invalid config `client.tag.` 
(missing client tag key).");
+            }
+            if (tagValue.trim().isEmpty()) {
+                throw new ConfigException(
+                    CLIENT_TAG_PREFIX + tagKey,
+                    "[]",

Review Comment:
   Fair point, the double blank is not great. I think `"[]"` is fine as a 
pragmatic choice — changing `ConfigException` itself is out of scope.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1664,29 +1664,60 @@ private void validateRackAwarenessConfiguration() {
         final Map<String, String> clientTags = getClientTags();
 
         if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
-            throw new ConfigException("At most " + 
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
-                                      "can be specified using " + 
CLIENT_TAG_PREFIX + " prefix.");
+            throw new ConfigException(
+                String.format(
+                    "At most %s client tags can be specified using %s prefix.",
+                    MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+                    CLIENT_TAG_PREFIX
+                )
+            );
         }
 
         for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+            if (rackAwareAssignmentTag.isEmpty()) {
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    "Contains invalid value []. Tag key cannot be empty."
+                );
+            }
             if (!clientTags.containsKey(rackAwareAssignmentTag)) {
-                throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
-                                          rackAwareAssignmentTags,
-                                          "Contains invalid value [" + 
rackAwareAssignmentTag + "] " +
-                                          "which doesn't have corresponding 
tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    String.format(
+                        "Contains invalid value [%s] which doesn't have 
corresponding tag set via [%s] prefix.",
+                        rackAwareAssignmentTag,
+                        CLIENT_TAG_PREFIX
+                    )
+                );
             }
         }
 
         clientTags.forEach((tagKey, tagValue) -> {
+            if (tagKey.trim().isEmpty()) {
+                throw new ConfigException("Invalid config `client.tag.` 
(missing client tag key).");

Review Comment:
   Good point, makes sense. The problem is the key, not the value, so the 
custom message is the right approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to