[ 
https://issues.apache.org/jira/browse/KAFKA-2867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2867:
-----------------------------------------
    Status: Patch Available  (was: Open)

> Missing synchronization and improperly handled InterruptException in 
> WorkerSourceTask
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2867
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2867
>             Project: Kafka
>          Issue Type: Bug
>          Components: copycat
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>            Priority: Blocker
>             Fix For: 0.9.0.0, 0.9.1.0
>
>
> In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case 
> (if the flush didn't even have to be started), this is ok because we are 
> already in a synchronized block. However, the other case is outside the 
> synchronized block.
> The result of this was transient failures of the system test for clean 
> bouncing copycat nodes. The bug doesn't cause exceptions because 
> finishSuccessfulFlush() only does a swap of two maps and sets a flag to 
> false. However, because of the swapping of the two maps that maintain 
> outstanding messages, we could by chance also be starting to send a message. 
> If the message accidentally gets added to the backlog queue, then the 
> flushing flag is toggled, we can "lose" that message temporarily into the 
> backlog queue. Then we'll get a callback that will log an error because it 
> can't find a record of the acked message (which, if it ever appears, should 
> be considered a critical issue since it shouldn't be possible), and then on 
> the next commit, it'll be swapped *back into place*. On the subsequent 
> commit, the flush will never be able to complete because the message will be 
> in the outstanding list, but will already have been acked. This, in turn, 
> makes it impossible to commit offsets, and results in duplicate messages even 
> under clean bounces where we should be able to get exactly once delivery 
> assuming no network delays or other issues.
> As a result of seeing this error, it became apparent that handling of 
> WorkerSourceTaskThreads that do not complete quickly enough was not working 
> properly. The ShutdownableThread should get interrupted if it does not 
> complete quickly enough, but logs like this would happen:
> {quote}
> [2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0 
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread 
> WorkerSourceTask-verifiable-source-0 
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:13,897] DEBUG WorkerSourceTask{id=verifiable-source-0} 
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store 
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,897] INFO Forcing shutdown of thread 
> WorkerSourceTask-verifiable-source-0 
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:18,897] ERROR Graceful stop of task 
> WorkerSourceTask{id=verifiable-source-0} failed. 
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:18,897] ERROR Failed to flush 
> WorkerSourceTask{id=verifiable-source-0}, timed out while waiting for 
> producer to flush outstanding messages 
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store 
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for 
> rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> {quote}
> Actions in the background thread performing the commit continue to occur 
> after it is supposedly interrupted. This is because InterruptedExceptions 
> during the flush were being ignored (some time ago they were not even 
> possible). Instead, any interruption by the main thread trying to shut down 
> the thread in preparation for a rebalance should be handled by failing the 
> commit operation and returning so the thread can exit cleanly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to