Yes, there is no such "synchronous subscribe" method in Kafka.
Consumer group will start to work after `poll` is called.

Inside Kafka, there are util methods like `awaitAssignment` or
`awaitRebalance` to wait for the initialization completed for tests, which
is checking with `poll`
You can refer to this test to learn how Kafka write tests internally:
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala

For the feature request, I think it will be a big change if we want to
start the rebalance and other metadata update during `subscribe`.
So, it might need a more solid motivation for this change.
Welcome to propose this feature with KIP process:
https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals

Thank you.
Luke

On Tue, Jan 25, 2022 at 12:27 PM Paul Whalen <pgwha...@gmail.com> wrote:

> Faced with the same problem, we've done the following:
>  - Find and save the latest offset(s) using consumer.endOffsets()
>  - consumer.poll() and process records in a while loop until you've
> processed up through the saved offsets
>
> Notice that it doesn't matter how long the poll duration is: you've set
> your watermark and will read until you get there, no matter how long it
> takes.  And you know you're going to get there eventually (assuming the
> cluster is healthy), because you know the offsets exist.
>
> Hope that helps!
>
> Paul
>
> On Mon, Jan 24, 2022 at 10:12 PM Sebastian Proksch <sebast...@proks.ch>
> wrote:
>
> > Hey all! I am actually not sure whether this is a bug report or a
> > feature request, maybe you can help me understand whether I am missing
> > the obvious.
> >
> > I am using Apache Kafka in one of my Java projects (using
> > apache-kafka-clients-3.1.0). In an integration test, I would like to
> > check that (after running some producers) a bunch of topics have the
> > expected contents. As such, the whole pipeline is filled and "standing
> > still" and I am connecting a single new consumer that alone forms a
> > new consumer group. I would like to iterate over all topics
> > (`KafkaConsumer.listTopics()`) and one by one consume all messages of
> > every topic.
> >
> > As far as I understand, a subscription on a new topic will initialize
> > a new consumer group and it takes a bit of time to join the group and
> > to rebalance the partitions. However, I would expect that this logic
> > is blocking and I find the following behavior unexpected...
> >
> >     var con = ... // get consumer instance
> >     con.subscribe(Set.of(SOME_TOPIC));
> >     var records = con.poll(Duration.ZERO)
> >
> > The poll will return an empty records array, even though there *IS*
> > data in said topic. Even when I register a `ConsumerRebalanceListener`
> > in the subscribe call, I won't ever see any assignment of
> > `TopicPartition` to the consumer, not even a delayed one.
> >
> > On the other hand, when I change the code to
> >
> >     var con = ... // get consumer instance
> >     con.subscribe(Set.of(SOME_TOPIC));
> >     var records = con.poll(Duration.ofMillis(100));
> >
> > I now get actual records. Also, when I register a
> > `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
> > notification.
> >
> > I really dislike the idea of introducing magic numbers like the 100ms
> > to my tests... if the number is too small and the assignment takes
> > longer for larger topics, my tests will break. Too large numbers will
> > slow the tests down unnecessarily. The functionality that I am
> > actually looking for is a synchronous version of the `subscribe`
> > method or some other way to block execution until my client has
> > finished joining the group and the partitions are rebalanced. It feels
> > like this should be default behavior.
> >
> > Am I completely off with my expectations for the behavior of the
> > `subscribe` method or am I missing something? Is there a way to
> > achieve said behavior with the current clients? Maybe my code just
> > lacks the right config parameter...
> >
> > Thanks for your help, any pointer is appreciated!
> >
> > best
> > Sebastian
> >
>

Reply via email to