[ https://issues.apache.org/jira/browse/KAFKA-2867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ewen Cheslack-Postava updated KAFKA-2867: ----------------------------------------- Status: Patch Available (was: Open) > Missing synchronization and improperly handled InterruptException in > WorkerSourceTask > ------------------------------------------------------------------------------------- > > Key: KAFKA-2867 > URL: https://issues.apache.org/jira/browse/KAFKA-2867 > Project: Kafka > Issue Type: Bug > Components: copycat > Reporter: Ewen Cheslack-Postava > Assignee: Ewen Cheslack-Postava > Priority: Blocker > Fix For: 0.9.0.0, 0.9.1.0 > > > In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case > (if the flush didn't even have to be started), this is ok because we are > already in a synchronized block. However, the other case is outside the > synchronized block. > The result of this was transient failures of the system test for clean > bouncing copycat nodes. The bug doesn't cause exceptions because > finishSuccessfulFlush() only does a swap of two maps and sets a flag to > false. However, because of the swapping of the two maps that maintain > outstanding messages, we could by chance also be starting to send a message. > If the message accidentally gets added to the backlog queue, then the > flushing flag is toggled, we can "lose" that message temporarily into the > backlog queue. Then we'll get a callback that will log an error because it > can't find a record of the acked message (which, if it ever appears, should > be considered a critical issue since it shouldn't be possible), and then on > the next commit, it'll be swapped *back into place*. On the subsequent > commit, the flush will never be able to complete because the message will be > in the outstanding list, but will already have been acked. This, in turn, > makes it impossible to commit offsets, and results in duplicate messages even > under clean bounces where we should be able to get exactly once delivery > assuming no network delays or other issues. > As a result of seeing this error, it became apparent that handling of > WorkerSourceTaskThreads that do not complete quickly enough was not working > properly. The ShutdownableThread should get interrupted if it does not > complete quickly enough, but logs like this would happen: > {quote} > [2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0 > (org.apache.kafka.connect.runtime.Worker) > [2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread > WorkerSourceTask-verifiable-source-0 > (org.apache.kafka.connect.util.ShutdownableThread) > [2015-11-18 01:02:13,897] DEBUG WorkerSourceTask{id=verifiable-source-0} > Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store > (org.apache.kafka.connect.storage.OffsetStorageWriter) > [2015-11-18 01:02:18,897] INFO Forcing shutdown of thread > WorkerSourceTask-verifiable-source-0 > (org.apache.kafka.connect.util.ShutdownableThread) > [2015-11-18 01:02:18,897] ERROR Graceful stop of task > WorkerSourceTask{id=verifiable-source-0} failed. > (org.apache.kafka.connect.runtime.Worker) > [2015-11-18 01:02:18,897] ERROR Failed to flush > WorkerSourceTask{id=verifiable-source-0}, timed out while waiting for > producer to flush outstanding messages > (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store > (org.apache.kafka.connect.storage.OffsetStorageWriter) > [2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for > rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > {quote} > Actions in the background thread performing the commit continue to occur > after it is supposedly interrupted. This is because InterruptedExceptions > during the flush were being ignored (some time ago they were not even > possible). Instead, any interruption by the main thread trying to shut down > the thread in preparation for a rebalance should be handled by failing the > commit operation and returning so the thread can exit cleanly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)