AndrewJSchofield commented on code in PR #20672:
URL: https://github.com/apache/kafka/pull/20672#discussion_r2451935790
##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -841,18 +841,22 @@ class DynamicMetricReporterState(brokerId: Int, config:
KafkaConfig, metrics: Me
reporters.forEach { reporter =>
metrics.addReporter(reporter)
currentReporters += reporter.getClass.getName -> reporter
- val clientTelemetryReceiver = reporter match {
- case telemetry: ClientTelemetry => telemetry.clientReceiver()
- case _ => null
- }
- if (clientTelemetryReceiver != null) {
- dynamicConfig.clientMetricsReceiverPlugin match {
- case Some(receiverPlugin) =>
- receiverPlugin.add(clientTelemetryReceiver)
- case None =>
- // Do nothing
- }
+ // Support both deprecated ClientTelemetry and new
ClientTelemetryExporterProvider interfaces
+ // If a class implements both, only use the new (i.e.,
ClientTelemetryExporterProvider interface)
+ dynamicConfig.clientTelemetryExporterPlugin match {
+ case Some(telemetryExporterPlugin) =>
+ reporter match {
+ case exporterProvider: ClientTelemetryExporterProvider =>
+ // Use new interface (i.e., takes precedence even if class also
implements deprecated interface)
+
telemetryExporterPlugin.add(exporterProvider.clientTelemetryExporter())
+ case telemetry: ClientTelemetry =>
+ telemetryExporterPlugin.add(telemetry.clientReceiver())
+ case _ =>
+ // Reporter doesn't support client telemetry
+ }
+ case None =>
+ // Do nothing
Review Comment:
I think that would be good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]