kfaraz commented on code in PR #18598:
URL: https://github.com/apache/druid/pull/18598#discussion_r2409293395
##########
docs/development/extensions-contrib/prometheus.md:
##########
@@ -44,7 +44,7 @@ All the configuration parameters for the Prometheus emitter
are under `druid.emi
| `druid.emitter.prometheus.addHostAsLabel` | Flag to include the hostname
as a prometheus label.
| no | false
|
| `druid.emitter.prometheus.addServiceAsLabel` | Flag to include the druid
service name (e.g. `druid/broker`, `druid/coordinator`, etc.) as a prometheus
label.
| no | false
|
| `druid.emitter.prometheus.pushGatewayAddress` | Pushgateway address.
Required if using `pushgateway` strategy.
| no | none
|
-| `druid.emitter.prometheus.flushPeriod` | Emit metrics to Pushgateway
every `flushPeriod` seconds. Required if `pushgateway` strategy is used.
| no | 15
|
+| `druid.emitter.prometheus.flushPeriod` | If strategy is
`pushgateway`, emits metrics every `flushPeriod` seconds. Required if
`pushgateway` strategy is used. If strategy is `exporter`, configures the
metric ttl such that if the metric value is not updated within the
`flushPeriod` then it will stop being emitted. Optional if `exporter` strategy
is used.
| no | 15
seconds for `pushgateway` strategy. None for `exporter` strategy
|
Review Comment:
```suggestion
| `druid.emitter.prometheus.flushPeriod` | If strategy is
`pushgateway`, emits metrics every `flushPeriod` seconds. If strategy is
`exporter`, configures the metric ttl such that if the metric value is not
updated within the `flushPeriod` then it will stop being emitted.
| Required if `pushgateway` strategy is
used, optional otherwise. | 15 seconds for `pushgateway` strategy. None
for `exporter` strategy. |
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -93,6 +93,18 @@ public void start()
} else {
log.error("HTTPServer is already started");
}
+ // Start TTL scheduler if TTL is configured
+ if (config.getFlushPeriod() != null) {
+ exec = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
+ // Check TTL every minute
+ exec.scheduleAtFixedRate(
+ this::cleanUpStaleMetrics,
+ config.getFlushPeriod(),
+ config.getFlushPeriod(),
+ TimeUnit.SECONDS
+ );
+ log.info("Started TTL scheduler with TTL of %d seconds",
config.getFlushPeriod());
Review Comment:
```suggestion
log.info("Started TTL scheduler with TTL of [%d] seconds.",
config.getFlushPeriod());
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -93,6 +93,18 @@ public void start()
} else {
log.error("HTTPServer is already started");
}
+ // Start TTL scheduler if TTL is configured
+ if (config.getFlushPeriod() != null) {
+ exec = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
+ // Check TTL every minute
Review Comment:
I don't think this comment is valid anymore, please remove it.
##########
extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java:
##########
@@ -430,4 +431,78 @@ public void testEmitterWithoutDeleteOnShutdown() throws
IOException
EasyMock.verify(mockPushGateway);
}
+
+ @Test
+ public void testMetricTtlExpiration()
+ {
+ int flushPeriod = 3;
+ PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, false, true, flushPeriod, null, false, null);
+ PrometheusEmitter emitter = new PrometheusEmitter(config);
+ emitter.start();
+
+ ServiceMetricEvent event = ServiceMetricEvent.builder()
+
.setMetric("segment/loadQueue/count", 10)
+
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+ emitter.emit(event);
+
+ // Get the metrics and check that it's not expired initially
+ Map<String, DimensionsAndCollector> registeredMetrics =
emitter.getMetrics().getRegisteredMetrics();
+ DimensionsAndCollector testMetric =
registeredMetrics.get("segment/loadQueue/count");
+
+ Assert.assertNotNull("Test metric should be registered", testMetric);
+ Assert.assertFalse("Metric should not be expired initially",
+ testMetric.isExpired(flushPeriod));
Review Comment:
Style: cleaner to put the first arg in a new line. Please use the same style
in other places too.
```suggestion
Assert.assertFalse(
"Metric should not be expired initially",
testMetric.isExpired(flushPeriod)
);
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java:
##########
@@ -55,4 +61,20 @@ public double[] getHistogramBuckets()
{
return histogramBuckets;
}
+
+ public void resetLastUpdateTime()
+ {
+ updateTimer.reset();
+ updateTimer.start();
+ }
+
+ public long getTimeSinceLastUpdate()
Review Comment:
```suggestion
public long getMillisSinceLastUpdate()
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -256,4 +279,21 @@ public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
+
+ private void cleanUpStaleMetrics()
+ {
+ if (config.getFlushPeriod() == null) {
+ return;
+ }
+
+ Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
+ for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
+ if (entry.getValue().isExpired(config.getFlushPeriod())) {
+ log.debug("Metric [%s] has expired (last updated %d ms ago)",
+ entry.getKey(),
+ entry.getValue().getTimeSinceLastUpdate());
Review Comment:
Style:
```suggestion
log.debug(
"Metric[%s] has expired as it was last updated [%d] ms ago.",
entry.getKey(),
entry.getValue().getTimeSinceLastUpdate()
);
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java:
##########
@@ -55,4 +61,20 @@ public double[] getHistogramBuckets()
{
return histogramBuckets;
}
+
+ public void resetLastUpdateTime()
+ {
+ updateTimer.reset();
+ updateTimer.start();
+ }
+
+ public long getTimeSinceLastUpdate()
+ {
+ return updateTimer.millisElapsed();
+ }
+
+ public boolean isExpired(long ttlSeconds)
+ {
+ return updateTimer.hasElapsed(new
Duration(TimeUnit.SECONDS.toMillis(ttlSeconds)));
Review Comment:
```suggestion
return updateTimer.hasElapsed(Duration.standardSeconds(ttlSeconds));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]