abhishekrb19 commented on code in PR #18598:
URL: https://github.com/apache/druid/pull/18598#discussion_r2404184040
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java:
##########
@@ -20,20 +20,23 @@
package org.apache.druid.emitter.prometheus;
import io.prometheus.client.SimpleCollector;
+import java.util.concurrent.atomic.AtomicLong;
public class DimensionsAndCollector
{
private final String[] dimensions;
private final SimpleCollector collector;
private final double conversionFactor;
private final double[] histogramBuckets;
+ private final AtomicLong lastUpdateTime;
Review Comment:
Resetting it to a previous value, would that show up as a separate
timeseries still?
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -93,6 +94,18 @@ public void start()
} else {
log.error("HTTPServer is already started");
}
+ // Start TTL scheduler if TTL is configured
+ if (config.getMetricTtlMs() != null) {
+ ttlExecutor = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
+ // Check TTL every minute
+ ttlExecutor.scheduleAtFixedRate(
+ this::checkMetricTtl,
+ config.getMetricTtlMs(),
+ config.getMetricTtlMs(),
+ TimeUnit.MILLISECONDS
Review Comment:
Does this need to be running every millisecond? Seems like running this at
`TimeUnit.SECONDS` or even at like 5s or so should suffice since the default
metric emission period is 1 minute.
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java:
##########
@@ -86,10 +90,11 @@ public PrometheusEmitterConfig(
@JsonProperty("pushGatewayAddress") @Nullable String pushGatewayAddress,
@JsonProperty("addHostAsLabel") boolean addHostAsLabel,
@JsonProperty("addServiceAsLabel") boolean addServiceAsLabel,
- @JsonProperty("flushPeriod") Integer flushPeriod,
+ @JsonProperty("flushPeriod") @Nullable Integer flushPeriod,
@JsonProperty("extraLabels") @Nullable Map<String, String> extraLabels,
@JsonProperty("deletePushGatewayMetricsOnShutdown") @Nullable Boolean
deletePushGatewayMetricsOnShutdown,
- @JsonProperty("waitForShutdownDelay") @Nullable Long waitForShutdownDelay
+ @JsonProperty("waitForShutdownDelay") @Nullable Long
waitForShutdownDelay,
+ @JsonProperty("metricTtlMs") @Nullable Long metricTtlMs
Review Comment:
Could you update the prometheus emitter docs to add this property? Do you
have guidance on what a good value for `metricTtlMs` might be that will be good
for most users?
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +278,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void checkMetricTtl()
+ {
+ if (config.getMetricTtlMs() == null) {
+ return;
+ }
+
+ Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+ for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+ if (entry.getValue().isExpired(config.getMetricTtlMs())) {
+ log.info("Metric [%s] has expired (last updated %d ms ago)",
Review Comment:
```suggestion
log.info("Clearing metric[%s] since it has expired (last updated
[%,d] ms ago)",
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void cleanUpStaleMetrics()
+ {
+ if (config.getFlushPeriod() == null) {
+ return;
+ }
+
+ Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+ for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+ if (entry.getValue().isExpired(config.getFlushPeriod())) {
Review Comment:
Extract `entry.getValue()` to a variable and reuse since it's used a few
times in this loop :)
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -93,6 +93,18 @@ public void start()
} else {
log.error("HTTPServer is already started");
}
+ // Start TTL scheduler if TTL is configured
+ if (config.getFlushPeriod() != null) {
+ exec = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
+ // Check TTL every minute
Review Comment:
I think this comment is stale?
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void cleanUpStaleMetrics()
+ {
+ if (config.getFlushPeriod() == null) {
+ return;
+ }
+
+ Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+ for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+ if (entry.getValue().isExpired(config.getFlushPeriod())) {
+ log.debug("Metric [%s] has expired (last updated %d ms ago)",
+ entry.getKey(),
+ entry.getValue().getTimeSinceLastUpdate());
+ entry.getValue().getCollector().clear();
Review Comment:
It looks like `clear()` removes all children for the metric. Do we instead
want to use `remove()` to target a specific time series by specifying the
corresponding set of label values?
Without that, metrics that track things like task ID or query ID, for
example, `task/run/time` or `query/wait/time` mentioned in
https://github.com/apache/druid/issues/14638 wouldn’t get evicted if they’re
being updated frequently.
When we call `metric.resetLastUpdateTime()`, should we pass the appropriate
label set and track it? Then in `cleanUpStaleMetrics()`, we could check the
label sets and clear only the stale time series rather than clearing all of
them or none at all. What do you think?
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java:
##########
@@ -55,4 +61,20 @@ public double[] getHistogramBuckets()
{
return histogramBuckets;
}
+
+ public void resetLastUpdateTime()
+ {
+ updateTimer.reset();
+ updateTimer.start();
Review Comment:
nit: can be simplified to `updateTimer.restart();`
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -93,6 +93,18 @@ public void start()
} else {
log.error("HTTPServer is already started");
}
+ // Start TTL scheduler if TTL is configured
+ if (config.getFlushPeriod() != null) {
+ exec = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
+ // Check TTL every minute
+ exec.scheduleAtFixedRate(
+ this::cleanUpStaleMetrics,
+ config.getFlushPeriod(),
+ config.getFlushPeriod(),
+ TimeUnit.SECONDS
+ );
+ log.info("Started TTL scheduler with TTL of %d seconds",
config.getFlushPeriod());
Review Comment:
```suggestion
log.info("Started TTL scheduler with TTL of [%d] seconds",
config.getFlushPeriod());
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java:
##########
@@ -97,6 +97,9 @@ public PrometheusEmitterConfig(
Preconditions.checkArgument(PATTERN.matcher(this.namespace).matches(),
"Invalid namespace " + this.namespace);
if (strategy == Strategy.exporter) {
Preconditions.checkArgument(port != null, "For `exporter` strategy, port
must be specified.");
+ if (Objects.nonNull(flushPeriod)) {
+ Preconditions.checkArgument(flushPeriod > 0, "flushPeriod must be
greater than 0.");
+ }
Review Comment:
- Would be good to include the invalid `flushPeriod` in the error message
- Looks like the other validation uses this to, could use `DruidException`
for newer ones. Something like:
```
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
"Invalid value for
'druid.emitter.prometheus.flushPeriod'[%s] specified, flushPeriod must be > 0
if specified.",
waitForShutdownDelay
)
);
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java:
##########
@@ -129,6 +134,18 @@ public PrometheusEmitterConfig(
);
}
+ if (metricTtlMs != null && metricTtlMs <= 0) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ StringUtils.format(
+ "Invalid value for metricTtlMs[%s]
specified, metricTtlMs must be > 0.",
+ metricTtlMs
Review Comment:
nit: you could skip `StringUtils.format()` and pass the string placeholder
since `build()` takes variable args already
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +278,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void checkMetricTtl()
+ {
+ if (config.getMetricTtlMs() == null) {
+ return;
+ }
+
+ Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+ for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+ if (entry.getValue().isExpired(config.getMetricTtlMs())) {
Review Comment:
`entry.getValue()` can be extracted to a variable in this loop and re-used
since it's used multiple times in this code block
##########
docs/development/extensions-contrib/prometheus.md:
##########
@@ -44,7 +44,7 @@ All the configuration parameters for the Prometheus emitter
are under `druid.emi
| `druid.emitter.prometheus.addHostAsLabel` | Flag to include the hostname
as a prometheus label.
| no | false
|
| `druid.emitter.prometheus.addServiceAsLabel` | Flag to include the druid
service name (e.g. `druid/broker`, `druid/coordinator`, etc.) as a prometheus
label.
| no | false
|
| `druid.emitter.prometheus.pushGatewayAddress` | Pushgateway address.
Required if using `pushgateway` strategy.
| no | none
|
-| `druid.emitter.prometheus.flushPeriod` | Emit metrics to Pushgateway
every `flushPeriod` seconds. Required if `pushgateway` strategy is used.
| no | 15
|
+| `druid.emitter.prometheus.flushPeriod` | If strategy is
`pushgateway`, emits metrics every `flushPeriod` seconds. Required if
`pushgateway` strategy is used. If strategy is `exporter`, configures the
metric ttl such that if the metric value is not updated within the
`flushPeriod` then it will stop being emitted. Optional if `exporter` strategy
is used.
| no | 15
seconds for `pushgateway` strategy. None for `exporter` strategy
|
Review Comment:
Thanks for the docs! Minor suggestion:
```suggestion
| `druid.emitter.prometheus.flushPeriod` | If strategy is
`pushgateway`, metrics are emitted every `flushPeriod` seconds. Required if
`pushgateway` strategy is used. If strategy is `exporter`, this configures the
metric TTL such that if the metric value is not updated within the
`flushPeriod` seconds, then it will stop being emitted. Optional if `exporter`
strategy is used.
| no
| 15 seconds for `pushgateway` strategy. None for `exporter` strategy
|
```
Also, do you have guidance on what an operator should set this value to for
TTL eviction (perhaps something as a function of emission period)?
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -211,6 +226,9 @@ public void close()
if (server != null) {
server.close();
}
+ if (exec != null) {
+ exec.shutdownNow();
+ }
Review Comment:
We could reverse the ordering of operations in `close()` since it's
generally good practice to close resources in the reverse order of how they
were initialized in `start()`; move this before the code block where we do
`server.close()`
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void cleanUpStaleMetrics()
+ {
+ if (config.getFlushPeriod() == null) {
+ return;
+ }
+
+ Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+ for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+ if (entry.getValue().isExpired(config.getFlushPeriod())) {
+ log.debug("Metric [%s] has expired (last updated %d ms ago)",
+ entry.getKey(),
+ entry.getValue().getTimeSinceLastUpdate());
+ entry.getValue().getCollector().clear();
Review Comment:
Is this `clear()` operation on `SimpleCollector` thread-safe? Doing a quick
scan of the
[docs](https://prometheus.io/docs/instrumenting/writing_clientlibs/#overall-structure)
suggest that the client libraries must be thread-safe, but wanted to confirm
that clearing can concurrently happen with the metric updates in `emitMetric()`
Side note: I think it would be better to move some of these methods inside
`DimensionsAndCollector` to avoid abstraction leaks. But that refactor can be
done later since it'd also require moving the metric updates from
`emitMetric()` into this class.
##########
extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java:
##########
@@ -430,4 +431,78 @@ public void testEmitterWithoutDeleteOnShutdown() throws
IOException
EasyMock.verify(mockPushGateway);
}
+
+ @Test
+ public void testMetricTtlExpiration()
+ {
+ int flushPeriod = 3;
+ PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, false, true, flushPeriod, null, false, null);
+ PrometheusEmitter emitter = new PrometheusEmitter(config);
+ emitter.start();
+
+ ServiceMetricEvent event = ServiceMetricEvent.builder()
+
.setMetric("segment/loadQueue/count", 10)
+
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+ emitter.emit(event);
+
+ // Get the metrics and check that it's not expired initially
+ Map<String, DimensionsAndCollector> registeredMetrics =
emitter.getMetrics().getRegisteredMetrics();
+ DimensionsAndCollector testMetric =
registeredMetrics.get("segment/loadQueue/count");
+
+ Assert.assertNotNull("Test metric should be registered", testMetric);
+ Assert.assertFalse("Metric should not be expired initially",
+ testMetric.isExpired(flushPeriod));
+
+ // Wait for the metric to expire (ttl + 1 second buffer)
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(flushPeriod) + 1000);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ Assert.assertTrue("Metric should be expired after TTL",
+ testMetric.isExpired(flushPeriod));
+ emitter.close();
+ }
+
+ @Test
+ public void testMetricTtlUpdate()
+ {
+ int flushPeriod = 3;
+ PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, false, true, flushPeriod, null, false, null);
+ PrometheusEmitter emitter = new PrometheusEmitter(config);
+ emitter.start();
+
+ ServiceMetricEvent event = ServiceMetricEvent.builder()
+
.setMetric("segment/loadQueue/count", 10)
+
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+ emitter.emit(event);
Review Comment:
Could you add the same metric with different label values so that it actrs
as a different timeseries and then verify that one exists while the other one
has expired?
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +278,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void checkMetricTtl()
Review Comment:
On the naming, something like `expireStaleMetrics()` or
`clearExpiredMetrics()` it clearer. What do you think?
Could you please add a brief javadoc on how the expiration is done?
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void cleanUpStaleMetrics()
Review Comment:
Could you please add a javadoc here? Or for the `exec` member?
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void cleanUpStaleMetrics()
+ {
+ if (config.getFlushPeriod() == null) {
+ return;
+ }
+
+ Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+ for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+ if (entry.getValue().isExpired(config.getFlushPeriod())) {
Review Comment:
`DimensionsAndCollector` could accept the flush period directly, initialized
from the Prometheus config that’s plumbed through `Metrics`. Then this code
could become `if (entry.getValue().isExpired()) {}` where this fixed config
isn't redundantly passed in
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]