divijvaidya commented on code in PR #13285: URL: https://github.com/apache/kafka/pull/13285#discussion_r1122955794
########## core/src/main/scala/kafka/network/SocketServer.scala: ########## @@ -606,7 +607,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, newPort } - private[network] val processors = new ArrayBuffer[Processor]() + private[network] val processors = new CopyOnWriteArrayList[Processor]() Review Comment: It would be nice if you could add a comment here on why we chose this data structure. Folks who look at this code in future will have a clear explanation of choices and tradeoffs we made for this. ########## core/src/main/scala/kafka/network/SocketServer.scala: ########## @@ -141,8 +142,8 @@ class SocketServer(val config: KafkaConfig, } newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory) newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory) - newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { - val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) + newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => { + val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => a.processors.asScala) Review Comment: We cannot convert processors to Scala since it transforms it into a mutable ArrayBuffer. We probably don't need Scala transformations here. Could you please try something like this: `dataPlaneAcceptors.values.stream.flatMap(a => a.processors.stream)` (same comment for other places such as line 119) ########## core/src/main/scala/kafka/network/SocketServer.scala: ########## @@ -115,22 +115,23 @@ class SocketServer(val config: KafkaConfig, private var stopped = false // Socket server metrics - newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { - val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) + newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => { + val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => a.processors.asScala) + // copy to an immutable array to avoid concurrency issue when calculating average val ioWaitRatioMetricNames = dataPlaneProcessors.map { p => metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) - } - if (dataPlaneProcessors.isEmpty) { + }.toArray Review Comment: This leads to object creation on every call to this metric (which is going to happen frequently). Do we really want this? If I understand correctly, your motivation is to guard against scenarios where the number of processors between the time when we calculate `ioWaitRatioMetricNames` and when we calculate `dataPlaneProcessors.size`. To mitigate it, we can store the size in an int before this and then, we don't have to convert `ioWaitRatioMetricNames` to an array. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org