[
https://issues.apache.org/jira/browse/KAFKA-2867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15015243#comment-15015243
]
ASF GitHub Bot commented on KAFKA-2867:
---
GitHub user ewencp opened a pull request:
https://github.com/apache/kafka/pull/566
KAFKA-2867: Fix missing WorkerSourceTask synchronization and handling of
InterruptException.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ewencp/kafka
kafka-2867-fix-source-sync-and-interrupt
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/566.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #566
commit 3a610d4f24f9e102458f335d98c420a563c08aad
Author: Ewen Cheslack-Postava
Date: 2015-11-19T21:37:17Z
KAFKA-2867: Fix missing WorkerSourceTask synchronization and handling of
InterruptException.
> Missing synchronization and improperly handled InterruptException in
> WorkerSourceTask
> -
>
> Key: KAFKA-2867
> URL: https://issues.apache.org/jira/browse/KAFKA-2867
> Project: Kafka
> Issue Type: Bug
> Components: copycat
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.9.0.0, 0.9.1.0
>
>
> In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case
> (if the flush didn't even have to be started), this is ok because we are
> already in a synchronized block. However, the other case is outside the
> synchronized block.
> The result of this was transient failures of the system test for clean
> bouncing copycat nodes. The bug doesn't cause exceptions because
> finishSuccessfulFlush() only does a swap of two maps and sets a flag to
> false. However, because of the swapping of the two maps that maintain
> outstanding messages, we could by chance also be starting to send a message.
> If the message accidentally gets added to the backlog queue, then the
> flushing flag is toggled, we can "lose" that message temporarily into the
> backlog queue. Then we'll get a callback that will log an error because it
> can't find a record of the acked message (which, if it ever appears, should
> be considered a critical issue since it shouldn't be possible), and then on
> the next commit, it'll be swapped *back into place*. On the subsequent
> commit, the flush will never be able to complete because the message will be
> in the outstanding list, but will already have been acked. This, in turn,
> makes it impossible to commit offsets, and results in duplicate messages even
> under clean bounces where we should be able to get exactly once delivery
> assuming no network delays or other issues.
> As a result of seeing this error, it became apparent that handling of
> WorkerSourceTaskThreads that do not complete quickly enough was not working
> properly. The ShutdownableThread should get interrupted if it does not
> complete quickly enough, but logs like this would happen:
> {quote}
> [2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread
> WorkerSourceTask-verifiable-source-0
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:13,897] DEBUG WorkerSourceTask{id=verifiable-source-0}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,897] INFO Forcing shutdown of thread
> WorkerSourceTask-verifiable-source-0
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:18,897] ERROR Graceful stop of task
> WorkerSourceTask{id=verifiable-source-0} failed.
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:18,897] ERROR Failed to flush
> WorkerSourceTask{id=verifiable-source-0}, timed out while waiting for
> producer to flush outstanding messages
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for
> rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> {quote}
> Actions in the background thread performing the commit continue to occur
> after it is supposedly interrupted. This is because InterruptedExceptions
> during the flush were being ignored (some time ago they were not even
> possible).