This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new d95857a1559 KAFKA-19504: Remove unused metrics reporter initialization
in KafkaAdminClient (#20166)
d95857a1559 is described below
commit d95857a155943013fe6da5d613b505e6e9688c16
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Jul 14 20:19:16 2025 -0400
KAFKA-19504: Remove unused metrics reporter initialization in
KafkaAdminClient (#20166)
The `AdminClient` adds a telemetry reporter to the metrics reporters
list in the constructor. The problem is that the reporter was already
added in the `createInternal` method. In the `createInternal` method
call, the `clientTelemetryReporter` is added to a
`List<MetricReporters>` which is passed to the `Metrics` object, will
get closed when `Metrics.close()` is called. But adding a reporter to
the reporters list in the constructor is not used by the `Metrics`
object and hence doesn't get closed, causing a memory leak.
All related tests pass after this change.
Reviewers: Apoorv Mittal <[email protected]>, Matthias J. Sax
<[email protected]>, Chia-Ping Tsai <[email protected]>,
Jhen-Yung Hsu <[email protected]>
---
.../main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index dc3164993b8..12e122b123a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -577,10 +577,12 @@ public class KafkaAdminClient extends AdminClient {
Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);
+ List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(clientId, config);
Optional<ClientTelemetryReporter> clientTelemetryReporter =
CommonClientConfigs.telemetryReporter(clientId, config);
+ clientTelemetryReporter.ifPresent(reporters::add);
try {
- metrics = new Metrics(new MetricConfig(), new LinkedList<>(),
time);
+ metrics = new Metrics(new MetricConfig(), reporters, time);
LogContext logContext = createLogContext(clientId);
return new KafkaAdminClient(config, clientId, time,
metadataManager, metrics,
client, null, logContext, clientTelemetryReporter);
@@ -625,9 +627,7 @@ public class KafkaAdminClient extends AdminClient {
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
- List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
- this.clientTelemetryReporter.ifPresent(reporters::add);
this.metadataRecoveryStrategy =
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.partitionLeaderCache = new HashMap<>();
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);