Hi

On 28.04.20 21:05, Guozhang Wang wrote:
> Thanks for the explanation Ben. They are very helpful.
> 
> Just to clarify on the context here:
> 
> 1) Before Kafka 2.0 the poll(long) call make sure that the rebalance would
> be completed when the call returns, no matter how long it takes. Note this
> also includes the time for refreshing the metadata for the newly assigned
> partitions potentially. There are many user feedbacks that this long
> polling do not obey the passed in "long" timeout at all since it has to
> block until the rebalance + metadata refresh is completed.
> 
> 2) So in Kafka 2.0 we introduced poll(Duration) which would practically be
> more strict in respecting the passed in timeout. It means, it could return
> while we are still in the middle of a rebalance. At the same time we
> deprecated the old overloaded function but we did not delete it, for users
> who do want blocking behavior and do not care about the potential long
> polling.
I think this is the precious explanation that is missing in the poll
Javadoc, it is important to understand that in the middle of a rebalance
commitSync should not be invoked.

> 3) Because of 2) above, before Kafka 2.5, when you call commit in the
> middle of a rebalance, it would throw a CommitFailedException. This is a
> fatal exception indicating that the partitions you're trying to commit is
> no longer assigned to you. However, in case of 2), it is not really a fatal
> error since we may still get the partitions back after we complete the
> rebalance. Hence throwing CommitFailed is "over-exaggerating" the situation
> to users.
This is the case I ran into and because the CommitFailedException
message wrongly refer to a poll interval problem, it is hard to
understand (affecting 2.0 -> 2.4)

> 4) That's why in 2.5, we changed the code to let it throw a different
> RebalanceInProgress exception instead of CommitFailed, to indicate users
> that you can, if possible, not treat it as a fatal error. *NOTE* it is
> usually suggested to trigger the commit during onPartitionRevoked callback
> during which the rebalance is not considered "started" yet and hence it is
> safe to commit, and hence in between the two callbacks, when there are no
> records returned at all, you would not need to call `commit` from the
> consumer directly anyways. Also *NOTE* that in 2.5 we added another
> onPartitionsLost function which is triggered differently than
> onPartitionRevoked: in the latter we know that we've already lost those
> partitions and hence users should not try to commit any more.Is is possible 
> that in a case of a true CommitFailedException (where a
consumer don't respect the poll interval), the onPartitionsLost callback
is also invoked when closing the consumer?

> So the question is, did you call `commitSync` within the
> onPartitionsRevoked callback which gets the exception, or did you call
> `commitSync` after the poll call that gets the exception? If it is the
> latter, then maybe you should override onPartitionsLost which would by
> default just call onPartitionsRevoked; if it is the latter, then it is
> possible that you've processed new records in the middle of a rebalance
> (otherwise there's no new offsets to be committed since you've just
> committed all offsets in the callback).
In my case I use the callback only to set some flags, the `commitSync`
is done after the poll invocation during an applicative checkpoint.

This should continue to work, assuming that after a `poll(Duration)`
call that doesn't trigger any ConsumerRebalanceListener callbacks,
`commitSync` must not raise a RebalanceInProgress exception. Am I right?

Thank you, I appreciate your very precise answers.

Regards

ben



> Guozhang
> 
> 
> 
> On Tue, Apr 28, 2020 at 1:34 AM Benoit Delbosc <bdelb...@nuxeo.com> wrote:
> 
>> Hi Guozhang,
>> thanks for your reply
>>
>> On 27.04.20 19:17, Guozhang Wang wrote:
>>> Hello Ben,
>>>
>>> First of all, just to clarify your versioning should be (4.5.0 -> 2.5.0)
>>> and (4.3.1 -> 2.3.1) etc, right? Currently Apache Kafka's latest release
>> is
>>> 2.5.0.
>> sorry, you are right I meant 2.3.1 and I upgraded to 2.5.0
>>
>>
>>> The RebalanceInProgressException is only introduced in the most recent
>>> releases (2.5.0+), previously it is only used internally and would not be
>>> exposed via public APIs. It is for the case that when the consumer is
>>> undergoing a rebalance, it can still return some data and hence the
>>> consumer may still want to commit. Note that before this version consumer
>>> would NOT return any data during rebalance.
>>
>> I think there is another case where a consumer can receive this
>> exception see below
>>
>>> In addition we also use this
>>> exception instead of the fatal CommitFailedException when we know that
>> the
>>> error is not actually "fatal"in my case I got this exception without
>> getting more data see below
>>
>>> In your case, if the application semantics allows you to abort
>>> checkpointing, then you can capture the exception, calling poll() again
>> and
>>> process the returned records which would not return duplicated data, and
>>> re-try checkpoint and commit (at a later position, since more records are
>>> processed now).
>>
>> Aborting a checkpoint means creating a duplicate in my application (let
>> say it happens after a DB transaction commit). Once again having
>> duplicate under failure cases is acceptable but not under normal usage
>> where consumers respect their contract (poll interval and session timeout).
>>
>> So I prefer to avoid receiving RebalanceInProgressException.
>>
>> In my case, the problem is that the consumer poll times out just between
>> onPartitionsRevoked and onPartitionAssigned invocations. Because my
>> consumer code is taking rebalancing in account only during
>> onPartitionAssigned it tries to commit while being revoked which
>> produces the RebalanceInProgressException.
>>
>> The fix is to take rebalancing into account in the consumer code also
>> during the onPArtitionsRevoked listener.
>>
>> Also, I think this problem was not present before Kafka 2.0 where
>> poll(Duration) was introduced, AFAIU poll(long) didn't timeout during
>> rebalancing ensuring that onPartitionAssigned is always invoked.
>>
>> I would suggest to improve the onPartitionsAssigned Javadoc or
>> poll(Duration) to make it clear, may be part of KAFKA-9882.
>>
>>
>> Regards
>>
>> ben
>>
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Apr 27, 2020 at 12:45 AM Benoit Delbosc <bdelb...@nuxeo.com>
>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am having a hard time understanding why a Consumer#commitSync can
>>>> randomly fail.
>>>>
>>>> Not being able to commit creates duplicates, which I can understand in
>>>> failure cases:
>>>>
>>>> - the poll interval is not respected max.poll.interval.ms (300s)
>>>>
>>>> - a consumer is stuck or die -> no heart beat during session.timeout.ms
>>>> (10s)
>>>>
>>>> - a rebalancing cannot be completed within rebalance.timeout.ms (60s)
>>>>
>>>> - Kafka cannot be reached within the default.api.timeout.ms (60s)
>>>>
>>>> - Kafka is itself in failure, for instance, zookeeper is not reachable
>>>> zookeeper.session.timeout.ms (6s)
>>>>
>>>>
>>>> But in "normal" situation I expect that consumers can enter and leave
>>>> the group and commit without raising an exception,
>>>>
>>>> this is not what I am seeing using Kafka 4.3.1 (client and server) where
>>>> I randomly get commit exception:
>>>>
>>>> CommitFailedException: Commit cannot be completed ... You can address
>> this
>>>> either by increasing max.poll.interval.ms or by reducing the maximum
>> size
>>>> of batches returned in poll() with max.poll.records.
>>>>
>>>>
>>>> Looking at the code and Jira I see that this exception message can
>>>> sometimes be wrong (I traced the last poll invocation was 5s before the
>>>> exception) and that there is a fix around this in 4.5.0.
>>>> I have upgraded the client code (server are still in 4.3.1) and now I
>>>> have a more specific exception:
>>>>
>>>> RebalanceInProgressException: Offset commit cannot be completed since
>> the
>>>> consumer is undergoing a rebalance for auto partition assignment. You
>> can
>>>> try completing the rebalance by calling poll() and then retry the
>> operation.
>>>>
>>>>
>>>> I am unsure what to do with that, in my code a consumer commits during a
>>>> checkpoint procedure, retrying processing will create duplicate which I
>>>> want to avoid,
>>>> furthermore calling poll from a checkpoint is weird especially if I have
>>>> no guarantee that I can commit the same offset.
>>>>
>>>> My guess is that there is a race condition during rebalancing the
>>>> consumer is calling poll and times out (no record), the
>>>> onPartitionsAssigned listener is not notified which could explain the
>>>> rebalance in progress commit exception.
>>>>
>>>> Am I missing something obvious to avoid this exception, how should it be
>>>> managed?
>>>>
>>>>
>>>> Thanks for your help.
>>>>
>>>> Regards
>>>>
>>>> ben
>>>>
>>>>
>>>
>>
> 
> 

Reply via email to