Hi Sebastian, I've encountered this as well during integration tests and have taken a different approach. When I need to verify the contents of topics with a single consumer I use the assign method. You do need to know the number of partitions of the topic as the argument used is a collection of TopicPartitions. It did take a bit of effort rewriting the tests, but because it saves several seconds of rebalancing per test it was worth it.
I hope this helps you a bit. Kind regards, Richard Bosch On Tue, Jan 25, 2022 at 5:12 AM Sebastian Proksch <sebast...@proks.ch> wrote: > Hey all! I am actually not sure whether this is a bug report or a > feature request, maybe you can help me understand whether I am missing > the obvious. > > I am using Apache Kafka in one of my Java projects (using > apache-kafka-clients-3.1.0). In an integration test, I would like to > check that (after running some producers) a bunch of topics have the > expected contents. As such, the whole pipeline is filled and "standing > still" and I am connecting a single new consumer that alone forms a > new consumer group. I would like to iterate over all topics > (`KafkaConsumer.listTopics()`) and one by one consume all messages of > every topic. > > As far as I understand, a subscription on a new topic will initialize > a new consumer group and it takes a bit of time to join the group and > to rebalance the partitions. However, I would expect that this logic > is blocking and I find the following behavior unexpected... > > var con = ... // get consumer instance > con.subscribe(Set.of(SOME_TOPIC)); > var records = con.poll(Duration.ZERO) > > The poll will return an empty records array, even though there *IS* > data in said topic. Even when I register a `ConsumerRebalanceListener` > in the subscribe call, I won't ever see any assignment of > `TopicPartition` to the consumer, not even a delayed one. > > On the other hand, when I change the code to > > var con = ... // get consumer instance > con.subscribe(Set.of(SOME_TOPIC)); > var records = con.poll(Duration.ofMillis(100)); > > I now get actual records. Also, when I register a > `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned` > notification. > > I really dislike the idea of introducing magic numbers like the 100ms > to my tests... if the number is too small and the assignment takes > longer for larger topics, my tests will break. Too large numbers will > slow the tests down unnecessarily. The functionality that I am > actually looking for is a synchronous version of the `subscribe` > method or some other way to block execution until my client has > finished joining the group and the partitions are rebalanced. It feels > like this should be default behavior. > > Am I completely off with my expectations for the behavior of the > `subscribe` method or am I missing something? Is there a way to > achieve said behavior with the current clients? Maybe my code just > lacks the right config parameter... > > Thanks for your help, any pointer is appreciated! > > best > Sebastian >