[jira] [Commented] (KAFKA-2867) Missing synchronization and improperly handled InterruptException in WorkerSourceTask

2015-11-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15018434#comment-15018434
 ] 

ASF GitHub Bot commented on KAFKA-2867:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/566


> Missing synchronization and improperly handled InterruptException in 
> WorkerSourceTask
> -
>
> Key: KAFKA-2867
> URL: https://issues.apache.org/jira/browse/KAFKA-2867
> Project: Kafka
>  Issue Type: Bug
>  Components: copycat
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case 
> (if the flush didn't even have to be started), this is ok because we are 
> already in a synchronized block. However, the other case is outside the 
> synchronized block.
> The result of this was transient failures of the system test for clean 
> bouncing copycat nodes. The bug doesn't cause exceptions because 
> finishSuccessfulFlush() only does a swap of two maps and sets a flag to 
> false. However, because of the swapping of the two maps that maintain 
> outstanding messages, we could by chance also be starting to send a message. 
> If the message accidentally gets added to the backlog queue, then the 
> flushing flag is toggled, we can "lose" that message temporarily into the 
> backlog queue. Then we'll get a callback that will log an error because it 
> can't find a record of the acked message (which, if it ever appears, should 
> be considered a critical issue since it shouldn't be possible), and then on 
> the next commit, it'll be swapped *back into place*. On the subsequent 
> commit, the flush will never be able to complete because the message will be 
> in the outstanding list, but will already have been acked. This, in turn, 
> makes it impossible to commit offsets, and results in duplicate messages even 
> under clean bounces where we should be able to get exactly once delivery 
> assuming no network delays or other issues.
> As a result of seeing this error, it became apparent that handling of 
> WorkerSourceTaskThreads that do not complete quickly enough was not working 
> properly. The ShutdownableThread should get interrupted if it does not 
> complete quickly enough, but logs like this would happen:
> {quote}
> [2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0 
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread 
> WorkerSourceTask-verifiable-source-0 
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:13,897] DEBUG WorkerSourceTask{id=verifiable-source-0} 
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store 
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,897] INFO Forcing shutdown of thread 
> WorkerSourceTask-verifiable-source-0 
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:18,897] ERROR Graceful stop of task 
> WorkerSourceTask{id=verifiable-source-0} failed. 
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:18,897] ERROR Failed to flush 
> WorkerSourceTask{id=verifiable-source-0}, timed out while waiting for 
> producer to flush outstanding messages 
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store 
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for 
> rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> {quote}
> Actions in the background thread performing the commit continue to occur 
> after it is supposedly interrupted. This is because InterruptedExceptions 
> during the flush were being ignored (some time ago they were not even 
> possible). Instead, any interruption by the main thread trying to shut down 
> the thread in preparation for a rebalance should be handled by failing the 
> commit operation and returning so the thread can exit cleanly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2867) Missing synchronization and improperly handled InterruptException in WorkerSourceTask

2015-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15015243#comment-15015243
 ] 

ASF GitHub Bot commented on KAFKA-2867:
---

GitHub user ewencp opened a pull request:

https://github.com/apache/kafka/pull/566

KAFKA-2867: Fix missing WorkerSourceTask synchronization and handling of 
InterruptException.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ewencp/kafka 
kafka-2867-fix-source-sync-and-interrupt

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #566


commit 3a610d4f24f9e102458f335d98c420a563c08aad
Author: Ewen Cheslack-Postava 
Date:   2015-11-19T21:37:17Z

KAFKA-2867: Fix missing WorkerSourceTask synchronization and handling of 
InterruptException.




> Missing synchronization and improperly handled InterruptException in 
> WorkerSourceTask
> -
>
> Key: KAFKA-2867
> URL: https://issues.apache.org/jira/browse/KAFKA-2867
> Project: Kafka
>  Issue Type: Bug
>  Components: copycat
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.9.0.0, 0.9.1.0
>
>
> In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case 
> (if the flush didn't even have to be started), this is ok because we are 
> already in a synchronized block. However, the other case is outside the 
> synchronized block.
> The result of this was transient failures of the system test for clean 
> bouncing copycat nodes. The bug doesn't cause exceptions because 
> finishSuccessfulFlush() only does a swap of two maps and sets a flag to 
> false. However, because of the swapping of the two maps that maintain 
> outstanding messages, we could by chance also be starting to send a message. 
> If the message accidentally gets added to the backlog queue, then the 
> flushing flag is toggled, we can "lose" that message temporarily into the 
> backlog queue. Then we'll get a callback that will log an error because it 
> can't find a record of the acked message (which, if it ever appears, should 
> be considered a critical issue since it shouldn't be possible), and then on 
> the next commit, it'll be swapped *back into place*. On the subsequent 
> commit, the flush will never be able to complete because the message will be 
> in the outstanding list, but will already have been acked. This, in turn, 
> makes it impossible to commit offsets, and results in duplicate messages even 
> under clean bounces where we should be able to get exactly once delivery 
> assuming no network delays or other issues.
> As a result of seeing this error, it became apparent that handling of 
> WorkerSourceTaskThreads that do not complete quickly enough was not working 
> properly. The ShutdownableThread should get interrupted if it does not 
> complete quickly enough, but logs like this would happen:
> {quote}
> [2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0 
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread 
> WorkerSourceTask-verifiable-source-0 
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:13,897] DEBUG WorkerSourceTask{id=verifiable-source-0} 
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store 
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,897] INFO Forcing shutdown of thread 
> WorkerSourceTask-verifiable-source-0 
> (org.apache.kafka.connect.util.ShutdownableThread)
> [2015-11-18 01:02:18,897] ERROR Graceful stop of task 
> WorkerSourceTask{id=verifiable-source-0} failed. 
> (org.apache.kafka.connect.runtime.Worker)
> [2015-11-18 01:02:18,897] ERROR Failed to flush 
> WorkerSourceTask{id=verifiable-source-0}, timed out while waiting for 
> producer to flush outstanding messages 
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store 
> (org.apache.kafka.connect.storage.OffsetStorageWriter)
> [2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for 
> rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> {quote}
> Actions in the background thread performing the commit continue to occur 
> after it is supposedly interrupted. This is because InterruptedExceptions 
> during the flush were being ignored (some time ago they were not even 
> possible).