This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new f35f94b3e61 KAFKA-19504: Remove unused metrics reporter initialization
in KafkaAdminClient (#20166)
f35f94b3e61 is described below
commit f35f94b3e619f7a55abc2350f852153323d8f3b9
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Jul 14 20:19:16 2025 -0400
KAFKA-19504: Remove unused metrics reporter initialization in
KafkaAdminClient (#20166)
The `AdminClient` adds a telemetry reporter to the metrics reporters
list in the constructor. The problem is that the reporter was already
added in the `createInternal` method. In the `createInternal` method
call, the `clientTelemetryReporter` is added to a
`List<MetricReporters>` which is passed to the `Metrics` object, will
get closed when `Metrics.close()` is called. But adding a reporter to
the reporters list in the constructor is not used by the `Metrics`
object and hence doesn't get closed, causing a memory leak.
All related tests pass after this change.
Reviewers: Apoorv Mittal <[email protected]>, Matthias J. Sax
<[email protected]>, Chia-Ping Tsai <[email protected]>,
Jhen-Yung Hsu <[email protected]>
---
.../main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index b283d65cbee..843509eb0a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -579,10 +579,12 @@ public class KafkaAdminClient extends AdminClient {
Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);
+ List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(clientId, config);
Optional<ClientTelemetryReporter> clientTelemetryReporter =
CommonClientConfigs.telemetryReporter(clientId, config);
+ clientTelemetryReporter.ifPresent(reporters::add);
try {
- metrics = new Metrics(new MetricConfig(), new LinkedList<>(),
time);
+ metrics = new Metrics(new MetricConfig(), reporters, time);
LogContext logContext = createLogContext(clientId);
return new KafkaAdminClient(config, clientId, time,
metadataManager, metrics,
client, null, logContext, clientTelemetryReporter);
@@ -627,9 +629,7 @@ public class KafkaAdminClient extends AdminClient {
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
- List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
- this.clientTelemetryReporter.ifPresent(reporters::add);
this.metadataRecoveryStrategy =
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.partitionLeaderCache = new HashMap<>();
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);