Michal Turek created KAFKA-2674:
-----------------------------------

             Summary: ConsumerRebalanceListener.onPartitionsRevoked() is not 
called on consumer close
                 Key: KAFKA-2674
                 URL: https://issues.apache.org/jira/browse/KAFKA-2674
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.9.0.0
            Reporter: Michal Turek
            Assignee: Neha Narkhede


Hi, I'm investigating and testing behavior of new consumer from the planned 
release 0.9 and found an inconsistency in calling of rebalance callbacks.

I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called 
during consumer close and application shutdown. It's JavaDoc contract says:

- "This method will be called before a rebalance operation starts and after the 
consumer stops fetching data."
- "It is recommended that offsets should be committed in this callback to 
either Kafka or a custom offset store to prevent duplicate data."

I believe calling consumer.close() is a start of rebalance operation and even 
the local consumer that is actually closing should be notified to be able to 
process any rebalance logic including offsets commit (e.g. if auto-commit is 
disabled).

There are commented logs of current and expected behaviors.

{noformat}
// Application start
2015-10-20 15:14:02.208 INFO  o.a.k.common.utils.AppInfoParser    
[TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT 
(AppInfoParser.java:82)
2015-10-20 15:14:02.208 INFO  o.a.k.common.utils.AppInfoParser    
[TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c 
(AppInfoParser.java:83)

// Consumer started (the first one in group), rebalance callbacks are called 
including empty onPartitionsRevoked()
2015-10-20 15:14:02.333 INFO  c.a.e.kafka.newapi.TestConsumer     
[TestConsumer-worker-0]: Rebalance callback, revoked: [] (TestConsumer.java:95)
2015-10-20 15:14:02.343 INFO  c.a.e.kafka.newapi.TestConsumer     
[TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, 
testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] 
(TestConsumer.java:100)

// Another consumer joined the group, rebalancing
2015-10-20 15:14:17.345 INFO  o.a.k.c.c.internals.Coordinator     
[TestConsumer-worker-0]: Attempt to heart beat failed since the group is 
rebalancing, try to re-join group. (Coordinator.java:714)
2015-10-20 15:14:17.346 INFO  c.a.e.kafka.newapi.TestConsumer     
[TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, 
testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] 
(TestConsumer.java:95)
2015-10-20 15:14:17.349 INFO  c.a.e.kafka.newapi.TestConsumer     
[TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, 
testB-4, testA-3] (TestConsumer.java:100)

// Consumer started closing, there SHOULD be onPartitionsRevoked() callback to 
commit offsets like during standard rebalance, but it is missing
2015-10-20 15:14:39.280 INFO  c.a.e.kafka.newapi.TestConsumer     [main]: 
Closing instance (TestConsumer.java:42)
2015-10-20 15:14:40.264 INFO  c.a.e.kafka.newapi.TestConsumer     
[TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89)
{noformat}

Workaround is to call onPartitionsRevoked() explicitly and manually just before 
calling consumer.close() but it seems dirty and error prone for me. It can be 
simply forgotten be someone without such experience.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to