[
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088
]
Richard Yu edited comment on KAFKA-8421 at 6/5/19 9:32 PM:
-----------------------------------------------------------
[~guozhang] There are a couple of simple approaches we could try to use to help
prevent the consumer from dropping out of the group during rebalance:
# We could artificially extend the rebalance timeout to accommodate the time
spent in poll but considering that goes against the design described in
[KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]],
I don't think this is much of an option.
# What we otherwise could do instead is attempt an early termination of the
record fetch request. In this scenario, we would cancel the request on broker
side, but by the time the user could call poll() again, it is possible that the
rebalance timeout has expired.
What we could do instead is the the following. The poll() operation which
begins during rebalance would not block and wait the entire allocated time by
the user, but instead, have this user timeout split into smaller chunks of
time, in between which, the consumer could check for a possible rejoin. (i.e.
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is
needed, we would send a JoinGroupRequest.
On broker side, we could modify the logic so that any pending fetch requests
would be respected and be continued to completion. The JoinGroup response will
be held off for a particular consumer until its fetch request has completed. In
which case, we would then send the JoinGroupResponse (although it has been
delayed).
I thought that this is a good way of avoiding breaking guarantees that Kafka
had in place (particularly in regards to rebalance timeout). WDYT of this
approach?
was (Author: yohan123):
[~guozhang] There are a couple of simple approaches we could try to use to help
prevent the consumer from dropping out of the group during rebalance:
# We could artificially extend the rebalance timeout to accommodate the time
spent in poll but considering that goes against the design described in
[KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]],
I don't think this is much of an option.
# What we otherwise could do instead is attempt an early termination of the
record fetch request. In this scenario, we would cancel the request on broker
side, but by the time the user could call poll() again, it is possible that the
rebalance timeout has expired.
What we could do instead is the the following. The poll() operation which
begins during rebalance would not block and wait the entire allocated time by
the user, but instead, have this user timeout split into smaller chunks of
time, in between which, the consumer could check for a possible rejoin. (i.e.
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is
needed, we would send a JoinGroupRequest.
On broker side, we could modify the logic so that any pending fetch requests
would be respected and be continued to completion. The JoinGroup response will
be held off for a particular consumer until its fetch request has completed. In
which case, we would then send the JoinGroupResponse (although it has been
delayed).
I thought that this is a good way of avoiding breaking guarantees that Kafka
had in place (particularly in regards to rebalance timeout). WDYT of this
approach?
> Allow consumer.poll() to return data in the middle of rebalance
> ---------------------------------------------------------------
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Reporter: Guozhang Wang
> Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group
> request its owned partitions may not be empty, meaning that some of its
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data.
> As an optimization, we can consider letting consumers to still return
> messages that still belong to its owned partitions even when it is within a
> rebalance, because we know it is safe that no one else would claim those
> partitions in this rebalance yet, and we can still commit offsets if, after
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when
> consumer's processing those records they may not call the next poll() in time
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer
> dropping out of the group during rebalance.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)