artemlivshits commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156501035


##########
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##########
@@ -45,10 +45,15 @@ abstract class InterBrokerSendThread(
 
   private val unsentRequests = new UnsentRequests
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
+  private val requestManagers = new ArrayList[InterBrokerRequestManager]()

Review Comment:
   The intent here is that requestManagers cannot be updated after the sender 
starts the thread, but I wonder if we actually honor this invariant in our 
bootstrap sequence.  At the very least we need to set a flag when the thread 
starts and throw an exception if an attempt to update requestManagers is made 
after the thread is started.  Or just add full synchronization around 
requestManagers access and that would allow to modify requestManagers after the 
sender thread started (or use ConcurrentLinkedQueue).



##########
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##########
@@ -156,6 +163,16 @@ abstract class InterBrokerSendThread(
   def wakeup(): Unit = networkClient.wakeup()
 }
 
+abstract class InterBrokerRequestManager() {
+  
+  var interBrokerSender: InterBrokerSender = _

Review Comment:
   I don't think this can be updated and read concurrently -- all updates 
happen in the startup thread before the object is visible to request threads.  
We can add a comment to clarify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to