Hi Sebastian,

I've encountered this as well during integration tests and have taken a
different approach.
When I need to verify the contents of topics with a single consumer I use
the assign method.
You do need to know the number of partitions of the topic as the argument
used is a collection of TopicPartitions.
It did take a bit of effort rewriting the tests, but because it saves
several seconds of rebalancing per test it was worth it.

I hope this helps you a bit.

Kind regards,


Richard Bosch




On Tue, Jan 25, 2022 at 5:12 AM Sebastian Proksch <sebast...@proks.ch>
wrote:

> Hey all! I am actually not sure whether this is a bug report or a
> feature request, maybe you can help me understand whether I am missing
> the obvious.
>
> I am using Apache Kafka in one of my Java projects (using
> apache-kafka-clients-3.1.0). In an integration test, I would like to
> check that (after running some producers) a bunch of topics have the
> expected contents. As such, the whole pipeline is filled and "standing
> still" and I am connecting a single new consumer that alone forms a
> new consumer group. I would like to iterate over all topics
> (`KafkaConsumer.listTopics()`) and one by one consume all messages of
> every topic.
>
> As far as I understand, a subscription on a new topic will initialize
> a new consumer group and it takes a bit of time to join the group and
> to rebalance the partitions. However, I would expect that this logic
> is blocking and I find the following behavior unexpected...
>
>     var con = ... // get consumer instance
>     con.subscribe(Set.of(SOME_TOPIC));
>     var records = con.poll(Duration.ZERO)
>
> The poll will return an empty records array, even though there *IS*
> data in said topic. Even when I register a `ConsumerRebalanceListener`
> in the subscribe call, I won't ever see any assignment of
> `TopicPartition` to the consumer, not even a delayed one.
>
> On the other hand, when I change the code to
>
>     var con = ... // get consumer instance
>     con.subscribe(Set.of(SOME_TOPIC));
>     var records = con.poll(Duration.ofMillis(100));
>
> I now get actual records. Also, when I register a
> `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
> notification.
>
> I really dislike the idea of introducing magic numbers like the 100ms
> to my tests... if the number is too small and the assignment takes
> longer for larger topics, my tests will break. Too large numbers will
> slow the tests down unnecessarily. The functionality that I am
> actually looking for is a synchronous version of the `subscribe`
> method or some other way to block execution until my client has
> finished joining the group and the partitions are rebalanced. It feels
> like this should be default behavior.
>
> Am I completely off with my expectations for the behavior of the
> `subscribe` method or am I missing something? Is there a way to
> achieve said behavior with the current clients? Maybe my code just
> lacks the right config parameter...
>
> Thanks for your help, any pointer is appreciated!
>
> best
> Sebastian
>

Reply via email to