Copilot commented on code in PR #24991:
URL: https://github.com/apache/pulsar/pull/24991#discussion_r2657790731


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
             }
         });
     }
+
+    protected CompletableFuture<Void> 
internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
+        // Feature flag check
+        if 
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Custom topic metric labels feature is disabled"));
+        }
+
+        // Validate labels against allowed keys and value length
+        Set<String> allowedKeys = getAllowedCustomMetricLabelKeys();
+        int maxValueLength = 
pulsar().getConfiguration().getMaxCustomMetricLabelValueLength();
+
+        if (labels != null && !labels.isEmpty()) {
+            for (Map.Entry<String, String> entry : labels.entrySet()) {
+                String key = entry.getKey();
+                String value = entry.getValue();
+
+                // Check if key is allowed
+                if (allowedKeys.isEmpty() || !allowedKeys.contains(key)) {

Review Comment:
   The validation logic has a flaw: when allowedKeys is empty AND the feature 
is enabled, all keys will be rejected. The condition should be 'if 
(!allowedKeys.isEmpty() && !allowedKeys.contains(key))' to only validate when 
there are configured allowed keys, otherwise the feature cannot be used at all.
   ```suggestion
                   // Check if key is allowed (only when there are configured 
allowed keys)
                   if (!allowedKeys.isEmpty() && !allowedKeys.contains(key)) {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
             }
         });
     }
+
+    protected CompletableFuture<Void> 
internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
+        // Feature flag check
+        if 
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Custom topic metric labels feature is disabled"));
+        }
+
+        // Validate labels against allowed keys and value length
+        Set<String> allowedKeys = getAllowedCustomMetricLabelKeys();
+        int maxValueLength = 
pulsar().getConfiguration().getMaxCustomMetricLabelValueLength();
+
+        if (labels != null && !labels.isEmpty()) {
+            for (Map.Entry<String, String> entry : labels.entrySet()) {
+                String key = entry.getKey();
+                String value = entry.getValue();
+
+                // Check if key is allowed
+                if (allowedKeys.isEmpty() || !allowedKeys.contains(key)) {
+                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                            "Label key '" + key + "' is not in the list of 
allowed custom metric label keys"));
+                }
+
+                // Check value length
+                if (value != null && value.length() > maxValueLength) {
+                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                            "Label value for key '" + key + "' exceeds maximum 
length of " + maxValueLength));
+                }
+            }
+        }
+
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal,
+                labels == null || labels.isEmpty(), policies -> {
+                    if (labels == null || labels.isEmpty()) {
+                        policies.setCustomMetricLabels(new HashMap<>());
+                    } else {
+                        policies.setCustomMetricLabels(new HashMap<>(labels));
+                    }
+                });
+    }
+
+    protected CompletableFuture<Map<String, String>> 
internalGetCustomMetricLabels(boolean isGlobal) {
+        if 
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Custom topic metric labels feature is disabled"));
+        }
+
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                .thenApply(op -> op.map(TopicPolicies::getCustomMetricLabels)
+                        .orElse(null));
+    }
+
+    protected CompletableFuture<Void> internalRemoveCustomMetricLabels(boolean 
removeAll, List<String> keys,
+                                                                       boolean 
isGlobal) {
+        if 
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Custom topic metric labels feature is disabled"));
+        }
+
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
isGlobal, true, policies -> {
+            Map<String, String> currentLabels = 
policies.getCustomMetricLabels();
+            if (currentLabels == null || currentLabels.isEmpty()) {
+                return; // Nothing to remove
+            }
+            if (removeAll) {
+                policies.setCustomMetricLabels(new HashMap<>());
+            } else {
+                for (String key : keys) {
+                    currentLabels.remove(key);
+                }
+                policies.setCustomMetricLabels(currentLabels);
+            }

Review Comment:
   When removeAll is false and keys is provided, the code does not check if 
keys is null or empty before iterating. If the caller passes removeAll=false 
with keys=null or an empty list, this could lead to a NullPointerException or 
no-op behavior without proper feedback to the user.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -5131,5 +5131,95 @@ public void getMessageIDByIndex(@Suspended final 
AsyncResponse asyncResponse,
                 });
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
+    @ApiOperation(value = "Set custom metric labels for a topic")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Operation successful"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic doesn't exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is 
disabled"),
+            @ApiResponse(code = 412, message = "Feature is disabled or invalid 
label keys/values"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setCustomMetricLabels(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @ApiParam(value = "Custom metric labels") Map<String, String> 
labels) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+                .thenCompose(__ -> preValidation(authoritative))
+                .thenCompose(__ -> internalSetCustomMetricLabels(labels, 
isGlobal))
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("setCustomMetricLabels", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
+    @ApiOperation(value = "Get custom metric labels for a topic", response = 
Map.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 200, message = "OK"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is 
disabled"),
+            @ApiResponse(code = 412, message = "Feature is disabled"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getCustomMetricLabels(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicPolicyOperationAsync(topicName, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
+                .thenCompose(__ -> preValidation(authoritative))
+                .thenCompose(__ -> internalGetCustomMetricLabels(isGlobal))
+                .thenApply(asyncResponse::resume).exceptionally(ex -> {
+                    handleTopicPolicyException("getCustomMetricLabels", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
+    @ApiOperation(value = "Remove custom metric labels from a topic")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Operation successful"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is 
disabled"),
+            @ApiResponse(code = 412, message = "Feature is disabled"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeCustomMetricLabels(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("all") @DefaultValue("false") boolean removeAll,
+            @QueryParam(value = "List of keys to remove, or null to remove 
all") List<String> keys) {

Review Comment:
   The @QueryParam annotation has an incorrect 'value' attribute. The value 
should be the parameter name (e.g., "keys"), not a description. The description 
should be in @ApiParam or similar documentation annotations. This will cause 
the query parameter binding to fail.
   ```suggestion
               @ApiParam(value = "List of keys to remove, or null to remove 
all") @QueryParam("keys") List<String> keys) {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
             }
         });
     }
+
+    protected CompletableFuture<Void> 
internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
+        // Feature flag check
+        if 
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Custom topic metric labels feature is disabled"));

Review Comment:
   The error message refers to "not enabled" but this is inconsistent with 
other error messages in the same file which use "disabled". For consistency, 
consider using "Custom topic metric labels feature is disabled" in all three 
methods.



##########
conf/broker.conf:
##########
@@ -1846,6 +1846,21 @@ metricsServletTimeoutMs=30000
 # Enable or disable broker bundles metrics. The default value is false.
 exposeBundlesMetricsInPrometheus=false
 
+# Enable or disable custom topic metric labels feature.
+# If enabled, custom metric labels can be set on topics and will be exposed in 
Prometheus metrics.
+# Default is false.
+exposeCustomTopicMetricLabelsEnabled=false
+
+# A comma-separated list of allowed custom metric label keys.
+# Only these keys can be set as custom metric labels on topics.
+# Example: sla_tier,data_sensitivity,cost_center,app_owner
+# If empty and the feature is enabled, no custom metric labels can be set.

Review Comment:
   The documentation states "If empty and the feature is enabled, no custom 
metric labels can be set" which contradicts the intended behavior. Based on the 
validation logic at line 5487, when allowedKeys is empty, ALL keys are 
rejected. However, this may be unexpected behavior - it might be more intuitive 
to allow any keys when the list is empty (no restrictions), rather than 
blocking all keys. The documentation should clarify this behavior or the 
validation logic should be reconsidered.
   ```suggestion
   # If empty and the feature is enabled, all custom metric labels are 
rejected; specify one or more keys to allow labels.
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
             }
         });
     }
+
+    protected CompletableFuture<Void> 
internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
+        // Feature flag check
+        if 
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Custom topic metric labels feature is disabled"));
+        }
+
+        // Validate labels against allowed keys and value length
+        Set<String> allowedKeys = getAllowedCustomMetricLabelKeys();
+        int maxValueLength = 
pulsar().getConfiguration().getMaxCustomMetricLabelValueLength();
+
+        if (labels != null && !labels.isEmpty()) {
+            for (Map.Entry<String, String> entry : labels.entrySet()) {
+                String key = entry.getKey();
+                String value = entry.getValue();
+
+                // Check if key is allowed
+                if (allowedKeys.isEmpty() || !allowedKeys.contains(key)) {
+                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                            "Label key '" + key + "' is not in the list of 
allowed custom metric label keys"));
+                }
+
+                // Check value length
+                if (value != null && value.length() > maxValueLength) {
+                    return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                            "Label value for key '" + key + "' exceeds maximum 
length of " + maxValueLength));
+                }
+            }
+        }
+
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal,
+                labels == null || labels.isEmpty(), policies -> {
+                    if (labels == null || labels.isEmpty()) {
+                        policies.setCustomMetricLabels(new HashMap<>());
+                    } else {
+                        policies.setCustomMetricLabels(new HashMap<>(labels));
+                    }
+                });
+    }
+
+    protected CompletableFuture<Map<String, String>> 
internalGetCustomMetricLabels(boolean isGlobal) {
+        if 
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Custom topic metric labels feature is disabled"));
+        }
+
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                .thenApply(op -> op.map(TopicPolicies::getCustomMetricLabels)
+                        .orElse(null));
+    }
+
+    protected CompletableFuture<Void> internalRemoveCustomMetricLabels(boolean 
removeAll, List<String> keys,
+                                                                       boolean 
isGlobal) {
+        if 
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Custom topic metric labels feature is disabled"));
+        }
+
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
isGlobal, true, policies -> {
+            Map<String, String> currentLabels = 
policies.getCustomMetricLabels();
+            if (currentLabels == null || currentLabels.isEmpty()) {
+                return; // Nothing to remove
+            }
+            if (removeAll) {
+                policies.setCustomMetricLabels(new HashMap<>());
+            } else {
+                for (String key : keys) {
+                    currentLabels.remove(key);
+                }
+                policies.setCustomMetricLabels(currentLabels);
+            }
+        });
+    }
+
+    private Set<String> getAllowedCustomMetricLabelKeys() {
+        String allowedKeysStr = 
pulsar().getConfiguration().getAllowedCustomMetricLabelKeys();
+        if (allowedKeysStr == null || allowedKeysStr.trim().isEmpty()) {
+            return Set.of();
+        }
+        return Set.of(allowedKeysStr.split(","));

Review Comment:
   The getAllowedCustomMetricLabelKeys method does not trim individual keys 
after splitting by comma. If the configuration contains spaces like "key1, 
key2, key3", the keys will include leading/trailing spaces and won't match user 
input. Each key should be trimmed after splitting.



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -3073,4 +3077,77 @@ void run() throws Exception {
             System.out.println(getAdmin().topics().getMessageIdByIndex(topic, 
index));
         }
     }
+
+    @Command(description = "Get custom metric labels for a topic")
+    private class GetCustomMetricLabels extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topic = validateTopicName(topicName);
+            print(getAdmin().topicPolicies().getCustomMetricLabels(topic));
+        }
+    }
+
+    @Command(description = "Set custom metric labels for a topic")
+    private class SetCustomMetricLabels extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = {"--labels",
+            "-l"}, description = "Custom metric labels (key=value pairs, comma 
separated, e.g. sla_tier=gold,"
+            + "app_owner=team-a)", required = true)
+        private String labelsStr;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topic = validateTopicName(topicName);
+            Map<String, String> labels = new HashMap<>();
+
+            if (labelsStr != null && !labelsStr.trim().isEmpty()) {
+                String[] pairs = labelsStr.split(",");
+                for (String pair : pairs) {
+                    String[] kv = pair.split("=", 2);
+                    if (kv.length != 2) {
+                        throw new ParameterException("Invalid label format: " 
+ pair + ". Expected format: key=value");
+                    }
+                    labels.put(kv[0].trim(), kv[1].trim());
+                }
+            }
+            
+            getAdmin().topicPolicies().setCustomMetricLabels(topic, labels);
+        }
+    }
+
+    @Command(description = "Remove custom metric labels from a topic")
+    private class RemoveCustomMetricLabels extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = {"--keys", "-k"}, description = "Label keys to remove"
+            + " (comma separated, e.g. sla_tier,app_owner). If not specified, "
+            + "all labels will be removed.", required = false)
+        private String keysStr;
+
+        @Option(names = {"--all", "-a"}, description = "Remove all labels", 
required = false)
+        private boolean removeAll;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topic = validateTopicName(topicName);
+
+            if (!removeAll) {
+                List<String> keys = Arrays.asList(keysStr.split(","));
+                keys = 
keys.stream().map(String::trim).collect(Collectors.toList());
+                if (keys.isEmpty()) {
+                    throw new ParameterException("No label keys specified for 
removal.");
+                }
+                getAdmin().topicPolicies().removeCustomMetricLabels(topic, 
false, keys);

Review Comment:
   When neither --keys nor --all is specified, keysStr will be null, causing a 
NullPointerException on line 3141 when split(",") is called. The code should 
check if keysStr is null or empty before attempting to split it, or require at 
least one of the options to be provided.



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java:
##########
@@ -1323,6 +1323,47 @@ public CompletableFuture<Void> 
deleteTopicPoliciesAsync(String topic) {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setCustomMetricLabels(String topic, Map<String, String> 
labels) throws PulsarAdminException {
+        sync(() -> setCustomMetricLabelsAsync(topic, labels));
+    }
+
+    @Override
+    public CompletableFuture<Void> setCustomMetricLabelsAsync(String topic, 
Map<String, String> labels) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "customMetricLabels");
+        return asyncPostRequest(path, Entity.entity(labels, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public Map<String, String> getCustomMetricLabels(String topic) throws 
PulsarAdminException {
+        return sync(() -> getCustomMetricLabelsAsync(topic));
+    }
+
+    @Override
+    public CompletableFuture<Map<String, String>> 
getCustomMetricLabelsAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "customMetricLabels");
+        return asyncGetRequest(path, new FutureCallback<Map<String, 
String>>(){});
+    }
+
+    @Override
+    public void removeCustomMetricLabels(String topic, boolean removeAll, 
List<String> keys) throws PulsarAdminException {
+        sync(() -> removeCustomMetricLabelsAsync(topic, removeAll, keys));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeCustomMetricLabelsAsync(String topic, 
boolean removeAll, List<String> keys) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "customMetricLabels");
+        if (removeAll) {
+            path.queryParam("all", true);
+        } else if (keys != null && !keys.isEmpty()) {
+            path.queryParam("keys", keys);

Review Comment:
   The path variable is being reassigned with query parameters but 
WebTarget.queryParam() returns a new instance. The modified path is not being 
used in the subsequent asyncDeleteRequest. This will result in query parameters 
not being sent to the server.
   ```suggestion
               path = path.queryParam("all", true);
           } else if (keys != null && !keys.isEmpty()) {
               path = path.queryParam("keys", keys);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to