[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842470#comment-17842470 ]
Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM: ----------------------------------------------------------------- Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps! was (Author: JIRAUSER300183): Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps > KIP-848 does not work well > -------------------------- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: sanghyeok An > Assignee: Kirk True > Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)