jsancio commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r460468239
########## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ########## @@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int, private val putLock = new ReentrantLock() private val queue = new LinkedBlockingQueue[QueuedEvent] // Visible for test - private[controller] val thread = new ControllerEventThread(ControllerEventThreadName) + private[controller] var thread = new ControllerEventThread(ControllerEventThreadName) Review comment: This comment applies to `clearAndPut`: ```scala def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) { queue.forEach(_.preempt(processor)) queue.clear() put(event) } ``` I think there is a bug here where at most one event will be process twice. Once by `_preempt(processor)` and once by `doWork`. I think we can fix this concurrency bug if we use `LinkedBlockingQueue::drainTo`. E.g. ```scala def clearAndPut(event: ControllerEvent): QueuedEvent = { val preemptedEvents = ...; inLock(putLock) { queue.drainTo(preemptedEvents) put(event) } preemtedEvents.forEach(_.preempt(processor)) } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org