Yes, there is no such "synchronous subscribe" method in Kafka. Consumer group will start to work after `poll` is called.
Inside Kafka, there are util methods like `awaitAssignment` or `awaitRebalance` to wait for the initialization completed for tests, which is checking with `poll` You can refer to this test to learn how Kafka write tests internally: https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala For the feature request, I think it will be a big change if we want to start the rebalance and other metadata update during `subscribe`. So, it might need a more solid motivation for this change. Welcome to propose this feature with KIP process: https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals Thank you. Luke On Tue, Jan 25, 2022 at 12:27 PM Paul Whalen <pgwha...@gmail.com> wrote: > Faced with the same problem, we've done the following: > - Find and save the latest offset(s) using consumer.endOffsets() > - consumer.poll() and process records in a while loop until you've > processed up through the saved offsets > > Notice that it doesn't matter how long the poll duration is: you've set > your watermark and will read until you get there, no matter how long it > takes. And you know you're going to get there eventually (assuming the > cluster is healthy), because you know the offsets exist. > > Hope that helps! > > Paul > > On Mon, Jan 24, 2022 at 10:12 PM Sebastian Proksch <sebast...@proks.ch> > wrote: > > > Hey all! I am actually not sure whether this is a bug report or a > > feature request, maybe you can help me understand whether I am missing > > the obvious. > > > > I am using Apache Kafka in one of my Java projects (using > > apache-kafka-clients-3.1.0). In an integration test, I would like to > > check that (after running some producers) a bunch of topics have the > > expected contents. As such, the whole pipeline is filled and "standing > > still" and I am connecting a single new consumer that alone forms a > > new consumer group. I would like to iterate over all topics > > (`KafkaConsumer.listTopics()`) and one by one consume all messages of > > every topic. > > > > As far as I understand, a subscription on a new topic will initialize > > a new consumer group and it takes a bit of time to join the group and > > to rebalance the partitions. However, I would expect that this logic > > is blocking and I find the following behavior unexpected... > > > > var con = ... // get consumer instance > > con.subscribe(Set.of(SOME_TOPIC)); > > var records = con.poll(Duration.ZERO) > > > > The poll will return an empty records array, even though there *IS* > > data in said topic. Even when I register a `ConsumerRebalanceListener` > > in the subscribe call, I won't ever see any assignment of > > `TopicPartition` to the consumer, not even a delayed one. > > > > On the other hand, when I change the code to > > > > var con = ... // get consumer instance > > con.subscribe(Set.of(SOME_TOPIC)); > > var records = con.poll(Duration.ofMillis(100)); > > > > I now get actual records. Also, when I register a > > `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned` > > notification. > > > > I really dislike the idea of introducing magic numbers like the 100ms > > to my tests... if the number is too small and the assignment takes > > longer for larger topics, my tests will break. Too large numbers will > > slow the tests down unnecessarily. The functionality that I am > > actually looking for is a synchronous version of the `subscribe` > > method or some other way to block execution until my client has > > finished joining the group and the partitions are rebalanced. It feels > > like this should be default behavior. > > > > Am I completely off with my expectations for the behavior of the > > `subscribe` method or am I missing something? Is there a way to > > achieve said behavior with the current clients? Maybe my code just > > lacks the right config parameter... > > > > Thanks for your help, any pointer is appreciated! > > > > best > > Sebastian > > >