[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810166#comment-16810166
 ] 

Guozhang Wang commented on KAFKA-4600:
--------------------------------------

[~braedon] [~dana.powers] I think I may understand the issue better now, while 
writing down the fixes of KAFKA-5154 and comparing it with the reported issue 
here. My understanding is that, in this ticket, we want to NOT swallow the 
exception thrown from the callback but let Consumer to handle it accordingly 
(or re-throw it to users). Here's a few options we can consider (any of these 
changes can be discussed as part of a KIP since it changes the consumer's 
semantics to users anyways):

0. As today, consumer swallows the error and log it and proceed as like the 
callback completes (i.e. the partitions are still revoked / assigned within 
this rebalance generation).
1. We never capture the exception thrown from the callback, instead it was 
treated as a fatal error and hence thrown all the way to the caller. The 
consumer instance will kill itself as well.
2. We let consumer to handle it, especially under KIP-429 when we no longer 
revoke everything before joining the group: e.g. suppose your current assigned 
partitions are {{1,2}}, and the newly assigned partitions are {{2, 3}}, the 
consumer will call onPartitionsAssigned(3) and onPartitionsRevoked(1). Suppose 
the former succeeds but the latter failed with an error, we just let the 
consumer to proceed as assign with {{1, 2, 3}} since the {{3}} is added 
successfully but {{1}} is revoked unsuccessfully.

Personally I would avoid consumer specific handling like 2) above, since it has 
to assume what does "throwing an exception" mean from the callback: does it 
mean none of the consumer's state was cleaned / initiated, or partly cleaned / 
initiated? Comparing with 0 and 1 I'm fine with 1), i.e. just always treat it 
as a fatal error and kill the consumer. If [~braedon] likes this idea, please 
feel free to propose it as a new KIP changing the default behavior of the 
consumer.


> Consumer proceeds on when ConsumerRebalanceListener fails
> ---------------------------------------------------------
>
>                 Key: KAFKA-4600
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4600
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.1.1
>            Reporter: Braedon Vickers
>            Priority: Major
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to