divijvaidya commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1122955794


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -606,7 +607,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
     newPort
   }
 
-  private[network] val processors = new ArrayBuffer[Processor]()
+  private[network] val processors = new CopyOnWriteArrayList[Processor]()

Review Comment:
   It would be nice if you could add a comment here on why we chose this data 
structure. Folks who look at this code in future will have a clear explanation 
of choices and tradeoffs we made for this.



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -141,8 +142,8 @@ class SocketServer(val config: KafkaConfig,
   }
   newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
   newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => {
+    val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => 
a.processors.asScala)

Review Comment:
   We cannot convert processors to Scala since it transforms it into a mutable 
ArrayBuffer.
   
   We probably don't need Scala transformations here. Could you please try 
something like this:
   `dataPlaneAcceptors.values.stream.flatMap(a => a.processors.stream)`
   
   (same comment for other places such as line 119)



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -115,22 +115,23 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => {
+    val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => 
a.processors.asScala)
+    // copy to an immutable array to avoid concurrency issue when calculating 
average
     val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
       metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-    }
-    if (dataPlaneProcessors.isEmpty) {
+    }.toArray

Review Comment:
   This leads to object creation on every call to this metric (which is going 
to happen frequently). Do we really want this?
   
   If I understand correctly, your motivation is to guard against scenarios 
where the number of processors between the time when we calculate 
`ioWaitRatioMetricNames` and when we calculate `dataPlaneProcessors.size`. To 
mitigate it, we can store the size in an int before this and then, we don't 
have to convert `ioWaitRatioMetricNames` to an array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to