[
https://issues.apache.org/jira/browse/KAFKA-2867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ewen Cheslack-Postava updated KAFKA-2867:
-----------------------------------------
Status: Patch Available (was: Open)
> Missing synchronization and improperly handled InterruptException in
> WorkerSourceTask
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-2867
> URL: https://issues.apache.org/jira/browse/KAFKA-2867
> Project: Kafka
> Issue Type: Bug
> Components: copycat
> Reporter: Ewen Cheslack-Postava
> Assignee: Ewen Cheslack-Postava
> Priority: Blocker
> Fix For: 0.9.0.0, 0.9.1.0
>
>
> In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case
> (if the flush didn't even have to be started), this is ok because we are
> already in a synchronized block. However, the other case is outside the
> synchronized block.
> The result of this was transient failures of the system test for clean
> bouncing copycat nodes. The bug doesn't cause exceptions because
> finishSuccessfulFlush() only does a swap of two maps and sets a flag to
> false. However, because of the swapping of the two maps that maintain
> outstanding messages, we could by chance also be starting to send a message.
> If the message accidentally gets added to the backlog queue, then the
> flushing flag is toggled, we can "lose" that message temporarily into the
> backlog queue. Then we'll get a callback that will log an error because it
> can't find a record of the acked message (which, if it ever appears, should
> be considered a critical issue since it shouldn't be possible), and then on
> the next commit, it'll be swapped *back into place*. On the subsequent
> commit, the flush will never be able to complete because the message will be
> in the outstanding list, but will already have been acked. This, in turn,
> makes it impossible to commit offsets, and results in duplicate messages even
> under clean bounces where we should be able to get exactly once delivery
> assuming no network delays or other issues.
> As a result of seeing this error, it became apparent that handling of
> WorkerSourceTaskThreads that do not complete quickly enough was not working
> properly. The ShutdownableThread should get interrupted if it does not
> complete quickly enough, but logs like this would happen:
> {quote}
> [2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread
> WorkerSourceTask-verifiable-source-0
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:13,897] DEBUG WorkerSourceTask{id=verifiable-source-0}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,897] INFO Forcing shutdown of thread
> WorkerSourceTask-verifiable-source-0
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:18,897] ERROR Graceful stop of task
> WorkerSourceTask{id=verifiable-source-0} failed.
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:18,897] ERROR Failed to flush
> WorkerSourceTask{id=verifiable-source-0}, timed out while waiting for
> producer to flush outstanding messages
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for
> rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> {quote}
> Actions in the background thread performing the commit continue to occur
> after it is supposedly interrupted. This is because InterruptedExceptions
> during the flush were being ignored (some time ago they were not even
> possible). Instead, any interruption by the main thread trying to shut down
> the thread in preparation for a rebalance should be handled by failing the
> commit operation and returning so the thread can exit cleanly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)