gharris1727 commented on PR #14762:
URL: https://github.com/apache/kafka/pull/14762#issuecomment-1816905171

   > What was the conclusion here? Is triggering wakeup on a separate thread 
(through WorkerSinkTask::stop) before also eventually closing the consumer on a 
separate thread (through WorkerSinkTask::cancel) safe? Looks like it probably 
might be, but I was only able to take a cursory look.
   
   I don't think it is. For example, it looks like doCommitSync catches the 
WakeupException and then makes another call to the consumer, and that call 
could be ongoing when cancel()/close() is called. I think also that after 
wakeup() exits, there's no guarantee that the other thread has actually thrown 
the WakeupException and released the consumer lock. Since the plugin has access 
to the WorkerSinkTaskContext, it can also perform an infinite loop and catch 
all of the WakeupExceptions.
   
   I think that preventing ConcurrentModificationExceptions completely will 
require heavy synchronization, which gives more opportunities for the task 
thread to block the herder thread. If we assume that framework has no infinite 
loops (which may not be the case given the behavior of doCommitSync), we could 
make cancellation only happen if the task thread is in plugin code, and the 
consumer is unlikely to be in use. This could be done with an AtomicReference, 
which is the same synchronization primitive that the Consumer is using to fire 
the ConcurrentModificationExceptions.
   
   I'll look into it more, I don't think we should merge this change until we 
have a handle on the CMEs. 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to