Faced with the same problem, we've done the following:
 - Find and save the latest offset(s) using consumer.endOffsets()
 - consumer.poll() and process records in a while loop until you've
processed up through the saved offsets

Notice that it doesn't matter how long the poll duration is: you've set
your watermark and will read until you get there, no matter how long it
takes.  And you know you're going to get there eventually (assuming the
cluster is healthy), because you know the offsets exist.

Hope that helps!

Paul

On Mon, Jan 24, 2022 at 10:12 PM Sebastian Proksch <sebast...@proks.ch>
wrote:

> Hey all! I am actually not sure whether this is a bug report or a
> feature request, maybe you can help me understand whether I am missing
> the obvious.
>
> I am using Apache Kafka in one of my Java projects (using
> apache-kafka-clients-3.1.0). In an integration test, I would like to
> check that (after running some producers) a bunch of topics have the
> expected contents. As such, the whole pipeline is filled and "standing
> still" and I am connecting a single new consumer that alone forms a
> new consumer group. I would like to iterate over all topics
> (`KafkaConsumer.listTopics()`) and one by one consume all messages of
> every topic.
>
> As far as I understand, a subscription on a new topic will initialize
> a new consumer group and it takes a bit of time to join the group and
> to rebalance the partitions. However, I would expect that this logic
> is blocking and I find the following behavior unexpected...
>
>     var con = ... // get consumer instance
>     con.subscribe(Set.of(SOME_TOPIC));
>     var records = con.poll(Duration.ZERO)
>
> The poll will return an empty records array, even though there *IS*
> data in said topic. Even when I register a `ConsumerRebalanceListener`
> in the subscribe call, I won't ever see any assignment of
> `TopicPartition` to the consumer, not even a delayed one.
>
> On the other hand, when I change the code to
>
>     var con = ... // get consumer instance
>     con.subscribe(Set.of(SOME_TOPIC));
>     var records = con.poll(Duration.ofMillis(100));
>
> I now get actual records. Also, when I register a
> `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
> notification.
>
> I really dislike the idea of introducing magic numbers like the 100ms
> to my tests... if the number is too small and the assignment takes
> longer for larger topics, my tests will break. Too large numbers will
> slow the tests down unnecessarily. The functionality that I am
> actually looking for is a synchronous version of the `subscribe`
> method or some other way to block execution until my client has
> finished joining the group and the partitions are rebalanced. It feels
> like this should be default behavior.
>
> Am I completely off with my expectations for the behavior of the
> `subscribe` method or am I missing something? Is there a way to
> achieve said behavior with the current clients? Maybe my code just
> lacks the right config parameter...
>
> Thanks for your help, any pointer is appreciated!
>
> best
> Sebastian
>

Reply via email to