[ https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810166#comment-16810166 ]
Guozhang Wang commented on KAFKA-4600: -------------------------------------- [~braedon] [~dana.powers] I think I may understand the issue better now, while writing down the fixes of KAFKA-5154 and comparing it with the reported issue here. My understanding is that, in this ticket, we want to NOT swallow the exception thrown from the callback but let Consumer to handle it accordingly (or re-throw it to users). Here's a few options we can consider (any of these changes can be discussed as part of a KIP since it changes the consumer's semantics to users anyways): 0. As today, consumer swallows the error and log it and proceed as like the callback completes (i.e. the partitions are still revoked / assigned within this rebalance generation). 1. We never capture the exception thrown from the callback, instead it was treated as a fatal error and hence thrown all the way to the caller. The consumer instance will kill itself as well. 2. We let consumer to handle it, especially under KIP-429 when we no longer revoke everything before joining the group: e.g. suppose your current assigned partitions are {{1,2}}, and the newly assigned partitions are {{2, 3}}, the consumer will call onPartitionsAssigned(3) and onPartitionsRevoked(1). Suppose the former succeeds but the latter failed with an error, we just let the consumer to proceed as assign with {{1, 2, 3}} since the {{3}} is added successfully but {{1}} is revoked unsuccessfully. Personally I would avoid consumer specific handling like 2) above, since it has to assume what does "throwing an exception" mean from the callback: does it mean none of the consumer's state was cleaned / initiated, or partly cleaned / initiated? Comparing with 0 and 1 I'm fine with 1), i.e. just always treat it as a fatal error and kill the consumer. If [~braedon] likes this idea, please feel free to propose it as a new KIP changing the default behavior of the consumer. > Consumer proceeds on when ConsumerRebalanceListener fails > --------------------------------------------------------- > > Key: KAFKA-4600 > URL: https://issues.apache.org/jira/browse/KAFKA-4600 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.10.1.1 > Reporter: Braedon Vickers > Priority: Major > > One of the use cases for a ConsumerRebalanceListener is to load state > necessary for processing a partition when it is assigned. However, when > ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. > the state isn't loaded), the error is logged and the consumer proceeds on as > if nothing happened, happily consuming messages from the new partition. When > the state is relied upon for correct processing, this can be very bad, e.g. > data loss can occur. > It would be better if the error was propagated up so it could be dealt with > normally. At the very least the assignment should fail so the consumer > doesn't see any messages from the new partitions, and the rebalance can be > reattempted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)