gharris1727 commented on PR #14762: URL: https://github.com/apache/kafka/pull/14762#issuecomment-1816905171
> What was the conclusion here? Is triggering wakeup on a separate thread (through WorkerSinkTask::stop) before also eventually closing the consumer on a separate thread (through WorkerSinkTask::cancel) safe? Looks like it probably might be, but I was only able to take a cursory look. I don't think it is. For example, it looks like doCommitSync catches the WakeupException and then makes another call to the consumer, and that call could be ongoing when cancel()/close() is called. I think also that after wakeup() exits, there's no guarantee that the other thread has actually thrown the WakeupException and released the consumer lock. Since the plugin has access to the WorkerSinkTaskContext, it can also perform an infinite loop and catch all of the WakeupExceptions. I think that preventing ConcurrentModificationExceptions completely will require heavy synchronization, which gives more opportunities for the task thread to block the herder thread. If we assume that framework has no infinite loops (which may not be the case given the behavior of doCommitSync), we could make cancellation only happen if the task thread is in plugin code, and the consumer is unlikely to be in use. This could be done with an AtomicReference, which is the same synchronization primitive that the Consumer is using to fire the ConcurrentModificationExceptions. I'll look into it more, I don't think we should merge this change until we have a handle on the CMEs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org