Hey all! I am actually not sure whether this is a bug report or a
feature request, maybe you can help me understand whether I am missing
the obvious.

I am using Apache Kafka in one of my Java projects (using
apache-kafka-clients-3.1.0). In an integration test, I would like to
check that (after running some producers) a bunch of topics have the
expected contents. As such, the whole pipeline is filled and "standing
still" and I am connecting a single new consumer that alone forms a
new consumer group. I would like to iterate over all topics
(`KafkaConsumer.listTopics()`) and one by one consume all messages of
every topic.

As far as I understand, a subscription on a new topic will initialize
a new consumer group and it takes a bit of time to join the group and
to rebalance the partitions. However, I would expect that this logic
is blocking and I find the following behavior unexpected...

    var con = ... // get consumer instance
    con.subscribe(Set.of(SOME_TOPIC));
    var records = con.poll(Duration.ZERO)

The poll will return an empty records array, even though there *IS*
data in said topic. Even when I register a `ConsumerRebalanceListener`
in the subscribe call, I won't ever see any assignment of
`TopicPartition` to the consumer, not even a delayed one.

On the other hand, when I change the code to

    var con = ... // get consumer instance
    con.subscribe(Set.of(SOME_TOPIC));
    var records = con.poll(Duration.ofMillis(100));

I now get actual records. Also, when I register a
`ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
notification.

I really dislike the idea of introducing magic numbers like the 100ms
to my tests... if the number is too small and the assignment takes
longer for larger topics, my tests will break. Too large numbers will
slow the tests down unnecessarily. The functionality that I am
actually looking for is a synchronous version of the `subscribe`
method or some other way to block execution until my client has
finished joining the group and the partitions are rebalanced. It feels
like this should be default behavior.

Am I completely off with my expectations for the behavior of the
`subscribe` method or am I missing something? Is there a way to
achieve said behavior with the current clients? Maybe my code just
lacks the right config parameter...

Thanks for your help, any pointer is appreciated!

best
Sebastian

Reply via email to