[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-14196:
-------------------------------
    Description: 
Several flaky tests under OffsetValidationTest are indicating potential 
consumer duplication issue, when autocommit is enabled.  I believe this is 
affecting *3.2* and onward.  Below shows the failure message:

 
{code:java}
Total consumed records 3366 did not match consumed position 3331 {code}
 

After investigating the log, I discovered that the data consumed between the 
start of a rebalance event and the async commit was lost for those failing 
tests.  In the example below, the rebalance event kicks in at around 
1662054846995 (first record), and the async commit of the offset 3739 is 
completed at around 1662054847015 (right before partitions_revoked).

 
{code:java}
{"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
{"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
{"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
{"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
 {code}
A few things to note here:
 # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate 
the issue
 # Setting includeMetadataInTimeout to false also seems to alleviate the issue.

The above tries seems to suggest that contract between poll() and asyncCommit() 
is broken.  AFAIK, we implicitly uses poll() to ack the previously fetched 
data, and the consumer would (try to) commit these offsets in the current 
poll() loop.  However, it seems like as the poll continues to loop, the "acked" 
data isn't being committed.

 

I believe this could be introduced in  
https://issues.apache.org/jira/browse/KAFKA-14024, which originated from 
https://issues.apache.org/jira/browse/KAFKA-13310.

More specifically, (see the comments below), the ConsumerCoordinator will alway 
return before async commit, due to the previous incomplete commit.  However, 
this is a bit contradictory here because:
 # I think we want to commit asynchronously while the poll continues, and if we 
do that, we are back to KAFKA-14024, that the consumer will get rebalance 
timeout and get kicked out of the group.
 # But we also need to commit all the "acked" offsets before revoking the 
partition, and this has to be blocked.

  was:
Several flaky tests under OffsetValidationTest are indicating potential 
consumer duplication issue, when autocommit is enabled.  Below shows the 
failure message:

 
{code:java}
Total consumed records 3366 did not match consumed position 3331 {code}
 

After investigating the log, I discovered that the data consumed between the 
start of a rebalance event and the async commit was lost for those failing 
tests.  In the example below, the rebalance event kicks in at around 
1662054846995 (first record), and the async commit of the offset 3739 is 
completed at around 1662054847015 (right before partitions_revoked).

 
{code:java}
{"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
{"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
{"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
{"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
 {code}
A few things to note here:
 # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate 
the issue
 # Setting includeMetadataInTimeout to false also seems to alleviate the issue.

The above tries seems to suggest that contract between poll() and asyncCommit() 
is broken.  AFAIK, we implicitly uses poll() as the ack of the previously 
fetched data, and the consumer would (try to) commit these offsets in the 
current poll() loop.  However, it seems like as the poll continues to loop, the 
"acked" data isn't being committed.

 

After the investigation, I believe this could be introduced in  
https://issues.apache.org/jira/browse/KAFKA-14024, which originated from 
https://issues.apache.org/jira/browse/KAFKA-13310.

More specifically, (see the comments below), the ConsumerCoordinator will alway 
return before async commit, due to the previous incomplete commit.  However, 
this is a bit contradictory here because:
 # I think we want to commit asynchronously while the poll continues.
 # But we also need to commit all the acked offsets before revoking the 
partition

 


> Flaky OffsetValidationTest seems to indicate potential duplication issue 
> during rebalance
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14196
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14196
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Philip Nee
>            Assignee: Philip Nee
>            Priority: Major
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  
> https://issues.apache.org/jira/browse/KAFKA-14024, which originated from 
> https://issues.apache.org/jira/browse/KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to