abhishekrb19 commented on code in PR #18598:
URL: https://github.com/apache/druid/pull/18598#discussion_r2404184040


##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java:
##########
@@ -20,20 +20,23 @@
 package org.apache.druid.emitter.prometheus;
 
 import io.prometheus.client.SimpleCollector;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class DimensionsAndCollector
 {
   private final String[] dimensions;
   private final SimpleCollector collector;
   private final double conversionFactor;
   private final double[] histogramBuckets;
+  private final AtomicLong lastUpdateTime;

Review Comment:
   Resetting it to a previous value, would that show up as a separate 
timeseries still? 



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -93,6 +94,18 @@ public void start()
       } else {
         log.error("HTTPServer is already started");
       }
+      // Start TTL scheduler if TTL is configured
+      if (config.getMetricTtlMs() != null) {
+        ttlExecutor = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
+        // Check TTL every minute
+        ttlExecutor.scheduleAtFixedRate(
+            this::checkMetricTtl,
+            config.getMetricTtlMs(),
+            config.getMetricTtlMs(),
+            TimeUnit.MILLISECONDS

Review Comment:
   Does this need to be running every millisecond? Seems like running this at 
`TimeUnit.SECONDS` or even at like 5s or so should suffice since the default 
metric emission period is 1 minute.



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java:
##########
@@ -86,10 +90,11 @@ public PrometheusEmitterConfig(
       @JsonProperty("pushGatewayAddress") @Nullable String pushGatewayAddress,
       @JsonProperty("addHostAsLabel") boolean addHostAsLabel,
       @JsonProperty("addServiceAsLabel") boolean addServiceAsLabel,
-      @JsonProperty("flushPeriod") Integer flushPeriod,
+      @JsonProperty("flushPeriod") @Nullable Integer flushPeriod,
       @JsonProperty("extraLabels") @Nullable Map<String, String> extraLabels,
       @JsonProperty("deletePushGatewayMetricsOnShutdown") @Nullable Boolean 
deletePushGatewayMetricsOnShutdown,
-      @JsonProperty("waitForShutdownDelay") @Nullable Long waitForShutdownDelay
+      @JsonProperty("waitForShutdownDelay") @Nullable Long 
waitForShutdownDelay,
+      @JsonProperty("metricTtlMs") @Nullable Long metricTtlMs

Review Comment:
   Could you update the prometheus emitter docs to add this property? Do you 
have guidance on what a good value for `metricTtlMs` might be that will be good 
for most users?



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +278,21 @@ public void setPushGateway(PushGateway pushGateway)
   {
     this.pushGateway = pushGateway;
   }
+
+  private void checkMetricTtl()
+  {
+    if (config.getMetricTtlMs() == null) {
+      return;
+    }
+
+    Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+    for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+      if (entry.getValue().isExpired(config.getMetricTtlMs())) {
+        log.info("Metric [%s] has expired (last updated %d ms ago)",

Review Comment:
   ```suggestion
           log.info("Clearing metric[%s] since it has expired (last updated 
[%,d] ms ago)",
   ```



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
   {
     this.pushGateway = pushGateway;
   }
+
+  private void cleanUpStaleMetrics()
+  {
+    if (config.getFlushPeriod() == null) {
+      return;
+    }
+
+    Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+    for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+      if (entry.getValue().isExpired(config.getFlushPeriod())) {

Review Comment:
   Extract `entry.getValue()` to a variable and reuse since it's used a few 
times in this loop :) 



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -93,6 +93,18 @@ public void start()
       } else {
         log.error("HTTPServer is already started");
       }
+      // Start TTL scheduler if TTL is configured
+      if (config.getFlushPeriod() != null) {
+        exec = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
+        // Check TTL every minute

Review Comment:
   I think this comment is stale?



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
   {
     this.pushGateway = pushGateway;
   }
+
+  private void cleanUpStaleMetrics()
+  {
+    if (config.getFlushPeriod() == null) {
+      return;
+    }
+
+    Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+    for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+      if (entry.getValue().isExpired(config.getFlushPeriod())) {
+        log.debug("Metric [%s] has expired (last updated %d ms ago)",
+                 entry.getKey(),
+                 entry.getValue().getTimeSinceLastUpdate());
+        entry.getValue().getCollector().clear();

Review Comment:
   It looks like `clear()` removes all children for the metric. Do we instead 
want to use `remove()` to target a specific time series by specifying the 
corresponding set of label values?
   
   Without that, metrics that track things like task ID or query ID, for 
example, `task/run/time` or `query/wait/time` mentioned in 
https://github.com/apache/druid/issues/14638 wouldn’t get evicted if they’re 
being updated frequently.
   
   When we call `metric.resetLastUpdateTime()`, should we pass the appropriate 
label set and track it? Then in `cleanUpStaleMetrics()`, we could check the 
label sets and clear only the stale time series rather than clearing all of 
them or none at all. What do you think?



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java:
##########
@@ -55,4 +61,20 @@ public double[] getHistogramBuckets()
   {
     return histogramBuckets;
   }
+
+  public void resetLastUpdateTime()
+  {
+    updateTimer.reset();
+    updateTimer.start();

Review Comment:
   nit: can be simplified to `updateTimer.restart();`



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -93,6 +93,18 @@ public void start()
       } else {
         log.error("HTTPServer is already started");
       }
+      // Start TTL scheduler if TTL is configured
+      if (config.getFlushPeriod() != null) {
+        exec = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
+        // Check TTL every minute
+        exec.scheduleAtFixedRate(
+            this::cleanUpStaleMetrics,
+            config.getFlushPeriod(),
+            config.getFlushPeriod(),
+            TimeUnit.SECONDS
+        );
+        log.info("Started TTL scheduler with TTL of %d seconds", 
config.getFlushPeriod());

Review Comment:
   ```suggestion
           log.info("Started TTL scheduler with TTL of [%d] seconds", 
config.getFlushPeriod());
   ```



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java:
##########
@@ -97,6 +97,9 @@ public PrometheusEmitterConfig(
     Preconditions.checkArgument(PATTERN.matcher(this.namespace).matches(), 
"Invalid namespace " + this.namespace);
     if (strategy == Strategy.exporter) {
       Preconditions.checkArgument(port != null, "For `exporter` strategy, port 
must be specified.");
+      if (Objects.nonNull(flushPeriod)) {
+        Preconditions.checkArgument(flushPeriod > 0, "flushPeriod must be 
greater than 0.");
+      }

Review Comment:
   - Would be good to include the invalid `flushPeriod` in the error message
   - Looks like the other validation uses this to, could use `DruidException` 
for newer ones. Something like:
   
   ``` 
   throw DruidException.forPersona(DruidException.Persona.OPERATOR)
                             .ofCategory(DruidException.Category.INVALID_INPUT)
                             .build(
                                     "Invalid value for 
'druid.emitter.prometheus.flushPeriod'[%s] specified, flushPeriod must be > 0 
if specified.",
                                     waitForShutdownDelay
                                 )
                             );
   ```



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java:
##########
@@ -129,6 +134,18 @@ public PrometheusEmitterConfig(
                           );
     }
 
+    if (metricTtlMs != null && metricTtlMs <= 0) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build(
+                              StringUtils.format(
+                                  "Invalid value for metricTtlMs[%s] 
specified, metricTtlMs must be > 0.",
+                                  metricTtlMs

Review Comment:
   nit: you could skip `StringUtils.format()` and pass the string placeholder 
since `build()` takes variable args already



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +278,21 @@ public void setPushGateway(PushGateway pushGateway)
   {
     this.pushGateway = pushGateway;
   }
+
+  private void checkMetricTtl()
+  {
+    if (config.getMetricTtlMs() == null) {
+      return;
+    }
+
+    Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+    for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+      if (entry.getValue().isExpired(config.getMetricTtlMs())) {

Review Comment:
   `entry.getValue()` can be extracted to a variable in this loop and re-used 
since it's used multiple times in this code block



##########
docs/development/extensions-contrib/prometheus.md:
##########
@@ -44,7 +44,7 @@ All the configuration parameters for the Prometheus emitter 
are under `druid.emi
 | `druid.emitter.prometheus.addHostAsLabel`     | Flag to include the hostname 
as a prometheus label.                                                          
                                                                                
                                          | no        | false                   
             |
 | `druid.emitter.prometheus.addServiceAsLabel`  | Flag to include the druid 
service name (e.g. `druid/broker`, `druid/coordinator`, etc.) as a prometheus 
label.                                                                          
                                               | no        | false              
                  |
 | `druid.emitter.prometheus.pushGatewayAddress` | Pushgateway address. 
Required if using `pushgateway` strategy.                                       
                                                                                
                                                  | no        | none            
                     |
-| `druid.emitter.prometheus.flushPeriod`        | Emit metrics to Pushgateway 
every `flushPeriod` seconds. Required if `pushgateway` strategy is used.        
                                                                                
                                           | no        | 15                     
              |
+| `druid.emitter.prometheus.flushPeriod`        | If strategy is 
`pushgateway`, emits metrics every `flushPeriod` seconds. Required if 
`pushgateway` strategy is used. If strategy is `exporter`, configures the 
metric ttl such that if the metric value is not updated within the 
`flushPeriod` then it will stop being emitted. Optional if `exporter` strategy 
is used.                                                                        
                                                           | no        | 15 
seconds for `pushgateway` strategy. None for `exporter` strategy                
                  |

Review Comment:
   Thanks for the docs! Minor suggestion:
   ```suggestion
   | `druid.emitter.prometheus.flushPeriod`        | If strategy is 
`pushgateway`, metrics are emitted every `flushPeriod` seconds. Required if 
`pushgateway` strategy is used. If strategy is `exporter`, this configures the 
metric TTL such that if the metric value is not updated within the 
`flushPeriod` seconds, then it will stop being emitted. Optional if `exporter` 
strategy is used.                                                               
                                                                    | no        
| 15 seconds for `pushgateway` strategy. None for `exporter` strategy           
                       |
   ```
   
   Also, do you have guidance on what an operator should set this value to for 
TTL eviction (perhaps something as a function of emission period)?



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -211,6 +226,9 @@ public void close()
       if (server != null) {
         server.close();
       }
+      if (exec != null) {
+        exec.shutdownNow();
+      }

Review Comment:
   We could reverse the ordering of operations in `close()` since it's 
generally good practice to close resources in the reverse order of how they 
were initialized in `start()`; move this before the code block where we do 
`server.close()` 



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
   {
     this.pushGateway = pushGateway;
   }
+
+  private void cleanUpStaleMetrics()
+  {
+    if (config.getFlushPeriod() == null) {
+      return;
+    }
+
+    Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+    for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+      if (entry.getValue().isExpired(config.getFlushPeriod())) {
+        log.debug("Metric [%s] has expired (last updated %d ms ago)",
+                 entry.getKey(),
+                 entry.getValue().getTimeSinceLastUpdate());
+        entry.getValue().getCollector().clear();

Review Comment:
   Is this `clear()` operation on `SimpleCollector` thread-safe? Doing a quick 
scan of the 
[docs](https://prometheus.io/docs/instrumenting/writing_clientlibs/#overall-structure)
 suggest that the client libraries must be thread-safe, but wanted to confirm 
that clearing can concurrently happen with the metric updates in `emitMetric()`
   
   
   Side note: I think it would be better to move some of these methods inside 
`DimensionsAndCollector` to avoid abstraction leaks. But that refactor can be 
done later since it'd also require moving the metric updates from 
`emitMetric()` into this class.



##########
extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java:
##########
@@ -430,4 +431,78 @@ public void testEmitterWithoutDeleteOnShutdown() throws 
IOException
 
     EasyMock.verify(mockPushGateway);
   }
+
+  @Test
+  public void testMetricTtlExpiration()
+  {
+    int flushPeriod = 3;
+    PrometheusEmitterConfig config = new 
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test", 
null, 0, null, false, true, flushPeriod, null, false, null);
+    PrometheusEmitter emitter = new PrometheusEmitter(config);
+    emitter.start();
+
+    ServiceMetricEvent event = ServiceMetricEvent.builder()
+                                                 
.setMetric("segment/loadQueue/count", 10)
+                                                 
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+    emitter.emit(event);
+
+    // Get the metrics and check that it's not expired initially
+    Map<String, DimensionsAndCollector> registeredMetrics = 
emitter.getMetrics().getRegisteredMetrics();
+    DimensionsAndCollector testMetric = 
registeredMetrics.get("segment/loadQueue/count");
+
+    Assert.assertNotNull("Test metric should be registered", testMetric);
+    Assert.assertFalse("Metric should not be expired initially",
+                       testMetric.isExpired(flushPeriod));
+
+    // Wait for the metric to expire (ttl + 1 second buffer)
+    try {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(flushPeriod) + 1000);
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    Assert.assertTrue("Metric should be expired after TTL",
+                      testMetric.isExpired(flushPeriod));
+    emitter.close();
+  }
+
+  @Test
+  public void testMetricTtlUpdate()
+  {
+    int flushPeriod = 3;
+    PrometheusEmitterConfig config = new 
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test", 
null, 0, null, false, true, flushPeriod, null, false, null);
+    PrometheusEmitter emitter = new PrometheusEmitter(config);
+    emitter.start();
+
+    ServiceMetricEvent event = ServiceMetricEvent.builder()
+                                                 
.setMetric("segment/loadQueue/count", 10)
+                                                 
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+    emitter.emit(event);

Review Comment:
   Could you add the same metric with different label values so that it actrs 
as a different timeseries and then verify that one exists while the other one 
has expired?



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +278,21 @@ public void setPushGateway(PushGateway pushGateway)
   {
     this.pushGateway = pushGateway;
   }
+
+  private void checkMetricTtl()

Review Comment:
   On the naming, something like `expireStaleMetrics()` or 
`clearExpiredMetrics()` it clearer. What do you think?
   
   Could you please add a brief javadoc on how the expiration is done?
   



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
   {
     this.pushGateway = pushGateway;
   }
+
+  private void cleanUpStaleMetrics()

Review Comment:
   Could you please add a javadoc here? Or for the `exec` member?



##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
   {
     this.pushGateway = pushGateway;
   }
+
+  private void cleanUpStaleMetrics()
+  {
+    if (config.getFlushPeriod() == null) {
+      return;
+    }
+
+    Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+    for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+      if (entry.getValue().isExpired(config.getFlushPeriod())) {

Review Comment:
   `DimensionsAndCollector` could accept the flush period directly, initialized 
from the Prometheus config that’s plumbed through `Metrics`. Then this code 
could become `if (entry.getValue().isExpired()) {}` where this fixed config 
isn't redundantly passed in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to