[ 
https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14968350#comment-14968350
 ] 

Jiangjie Qin commented on KAFKA-2674:
-------------------------------------

[~guozhang] That is a valid concern, I haven't think it through before. The 
question is what states could change and whether the changes matter. From 
consumer's point of view, there are only two states that matter: 1)Partition 
Assignment 2)Consumer Offsets.

In most cases, user would not care about partition assignment. In cases where 
user care about the partition assignment, the assignment after close would be 
empty. So it seems users do not really lose anything from here.

If user are using auto commit, that means user do not really care about the 
offset commit timing as long as the correct offset is committed. When consumer 
closes, the correct offsets to commit are the consumed offsets, and consumed 
offsets will not change unless user call poll(). If user don't want committed 
offset to change before and after consumer closes, they can either call 
commitOffsetSync() before closing consumer by themselves or disable auto 
commit, depending on the use cases.

On the other hand, if users are not using auto commit, we will not commit the 
offsets, so the state won't change.

So I think we are OK here. Do you have some specific use case that breaks?

> ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer 
> close
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-2674
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2674
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>            Reporter: Michal Turek
>            Assignee: Neha Narkhede
>
> Hi, I'm investigating and testing behavior of new consumer from the planned 
> release 0.9 and found an inconsistency in calling of rebalance callbacks.
> I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called 
> during consumer close and application shutdown. It's JavaDoc contract says:
> - "This method will be called before a rebalance operation starts and after 
> the consumer stops fetching data."
> - "It is recommended that offsets should be committed in this callback to 
> either Kafka or a custom offset store to prevent duplicate data."
> I believe calling consumer.close() is a start of rebalance operation and even 
> the local consumer that is actually closing should be notified to be able to 
> process any rebalance logic including offsets commit (e.g. if auto-commit is 
> disabled).
> There are commented logs of current and expected behaviors.
> {noformat}
> // Application start
> 2015-10-20 15:14:02.208 INFO  o.a.k.common.utils.AppInfoParser    
> [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT 
> (AppInfoParser.java:82)
> 2015-10-20 15:14:02.208 INFO  o.a.k.common.utils.AppInfoParser    
> [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c 
> (AppInfoParser.java:83)
> // Consumer started (the first one in group), rebalance callbacks are called 
> including empty onPartitionsRevoked()
> 2015-10-20 15:14:02.333 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Rebalance callback, revoked: [] 
> (TestConsumer.java:95)
> 2015-10-20 15:14:02.343 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, 
> testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] 
> (TestConsumer.java:100)
> // Another consumer joined the group, rebalancing
> 2015-10-20 15:14:17.345 INFO  o.a.k.c.c.internals.Coordinator     
> [TestConsumer-worker-0]: Attempt to heart beat failed since the group is 
> rebalancing, try to re-join group. (Coordinator.java:714)
> 2015-10-20 15:14:17.346 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, 
> testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] 
> (TestConsumer.java:95)
> 2015-10-20 15:14:17.349 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, 
> testB-4, testA-3] (TestConsumer.java:100)
> // Consumer started closing, there SHOULD be onPartitionsRevoked() callback 
> to commit offsets like during standard rebalance, but it is missing
> 2015-10-20 15:14:39.280 INFO  c.a.e.kafka.newapi.TestConsumer     [main]: 
> Closing instance (TestConsumer.java:42)
> 2015-10-20 15:14:40.264 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89)
> {noformat}
> Workaround is to call onPartitionsRevoked() explicitly and manually just 
> before calling consumer.close() but it seems dirty and error prone for me. It 
> can be simply forgotten be someone without such experience.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to