jsancio commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r460468239



##########
File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala
##########
@@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int,
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
-  private[controller] val thread = new 
ControllerEventThread(ControllerEventThreadName)
+  private[controller] var thread = new 
ControllerEventThread(ControllerEventThreadName)

Review comment:
       This comment applies to `clearAndPut`:
   
   ```scala
     def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {
       queue.forEach(_.preempt(processor))
       queue.clear()
       put(event)
     }
   ```
   I think there is a bug here where at most one event will be process twice. 
Once by `_preempt(processor)` and once by `doWork`. I think we can fix this 
concurrency bug if we use `LinkedBlockingQueue::drainTo`. E.g.
   
   ```scala
     def clearAndPut(event: ControllerEvent): QueuedEvent = {
       val preemptedEvents = ...;
   
       inLock(putLock) {
         queue.drainTo(preemptedEvents)
         put(event)
       }
   
       preemtedEvents.forEach(_.preempt(processor))
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to