[ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843887#comment-17843887
 ] 

Lianet Magrans edited comment on KAFKA-16670 at 5/6/24 9:01 PM:
----------------------------------------------------------------

Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe (HeartbeatRequest) until it gets a response to the initial 
FindCoordinator request (HeartbeatManager skips sending requests if it does not 
know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed). 

This integration test for the consumer 
[testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153]
 has a very similar logic to the one you're trying to achieve (create consumer, 
subscribe right away and consume), and since a new broker and consumer are 
setup for each test, the test will go down the same path of having to find a 
coordinator before sending the HeartbeatRequest with a subscription. The main 
difference from looking at both seems to be the limited number of polls in your 
failed test scenario, so let's try to rule that out to better isolate the 
situation. Hope it helps! Let me know


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe until it gets a response to the initial FindCoordinator request 
(HeartbeatManager skips sending requests if it does not know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed). 

This integration test for the consumer 
[testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153]
 has a very similar logic to the one you're trying to achieve (create consumer, 
subscribe right away and consume), and since a new broker and consumer are 
setup for each test, the test will go down the same path of having to find a 
coordinator before sending the HeartbeatRequest with a subscription. The main 
difference from looking at both seems to be the limited number of polls in your 
failed test scenario, so let's try to rule that out to better isolate the 
situation. Hope it helps! Let me know

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-16670
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16670
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: sanghyeok An
>            Priority: Major
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, 
> {{consumer background thread}} start to send a request to the broker. (I 
> believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this 
> time, if the {{broker}} has not yet loaded metadata from 
> {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once 
> the broker has completely loaded the metadata, the {{consumer background 
> thread}} recognizes this broker as a valid group coordinator. However, there 
> is a possibility that the {{consumer}} can send a {{subscribe request}} to 
> the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator 
> Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be 
> stuck.
>  
> You can check this scenario, in the 
> {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}.
>  If there is no sleep time to wait {{{}GroupCoordinator Heartbeat 
> Request{}}}, {{consumer}} will be always stuck. If there is a little sleep 
> time, {{consumer}} will always receive assignment.
>  
> README : 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md]
>  
> In my case, consumer get assignment in `docker-compose` : it means not enough 
> slow. 
> However, consumer cannot get assignmet in `testcontainers` without little 
> waiting time. : it means enough slow to cause concurrent issue. 
> `testconatiners` is docker in docker, thus `testcontainers` will be slower 
> than `docker-compose`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to