[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM:
-----------------------------------------------------------------

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps!


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --------------------------
>
>                 Key: KAFKA-16637
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16637
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: sanghyeok An
>            Assignee: Kirk True
>            Priority: Minor
>              Labels: kip-848-client-support
>             Fix For: 3.8.0
>
>         Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to