[ 
https://issues.apache.org/jira/browse/KAFKA-2748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14992060#comment-14992060
 ] 

ASF GitHub Bot commented on KAFKA-2748:
---------------------------------------

GitHub user ewencp opened a pull request:

    https://github.com/apache/kafka/pull/431

    KAFKA-2748: Ensure sink tasks commit offsets upon rebalance and rewind if 
the SinkTask flush fails.

    Also fix the incorrect consumer group ID setting which was giving each task 
its
    own group instead of one for the entire sink connector.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ewencp/kafka 
kafka-2748-sink-task-rebalance-commit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #431
    
----
commit 4fc35c2fd6f358b9d234bf9338a7ef342c87af83
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Date:   2015-11-05T01:04:56Z

    KAFKA-2748: Ensure sink tasks commit offsets upon rebalance and rewind if 
the SinkTask flush fails.
    
    Also fix the incorrect consumer group ID setting which was giving each task 
its
    own group instead of one for the entire sink connector.

----


> SinkTasks do not handle rebalances and offset commit properly
> -------------------------------------------------------------
>
>                 Key: KAFKA-2748
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2748
>             Project: Kafka
>          Issue Type: Bug
>          Components: copycat
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>             Fix For: 0.9.0.0
>
>
> Since the initial SinkTask code was originally written with an early version 
> of the new consumer, it wasn't setup to handle rebalances properly. Since we 
> recently added the rebalance listener, we can use it to correctly commit 
> offsets. However, the existing code also has two issues. First, in the case 
> of a failure to flush data in the sink task, we are not correctly rewinding 
> to the last committed offsets. We need to do this since we cannot be sure 
> what happened to the outstanding data, so we need to reprocess it. 
> Second, flushing when stopping was not being handled propertly. The existing 
> code was assuming that as part of SinkTask.stop() we would. However, this did 
> not make sense since SinkTask.stop() was being invoked before the worker 
> thread was stopped, so we could end up committing the wrong offsets. Instead, 
> we need to wait for the worker thread to finish whatever it is currently 
> doing, do one final flush + commit offsets, and only then invoke stop() to 
> allow the task to do final cleanup. This is a bit confusing because stop 
> means different things for source and sink tasks since they have pull vs push 
> semantics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to