Thank you! Will take a look at the PR.

On Fri, May 5, 2023 at 11:56 PM Erik van Oosten
<e.vanoos...@grons.nl.invalid> wrote:

> Thanks!
>
> Here is Tom Lee's PR recreated on trunk:
> https://github.com/apache/kafka/pull/13678
>
> I believe that this PR might not be complete though. When we only call
> commitAsync (repeatedly) from the rebalance listener callback method.
> Will the client's poll method ever be invoked? I suspect that no polling
> takes place in this scenario and that async commits will still not
> complete. With the changes of this PR, commitSync can be used as a
> workaround.
>
> I guess we can fix this by adding `client.pollNoWakeup()`, e.g. at the
> start of `ConsumerCoordinator.commitOffsetsAsync`. Is that an acceptable
> change?
>
> Kind regards,
>      Erik.
>
>
> Op 05-05-2023 om 20:20 schreef Philip Nee:
> > Hey Eric,
> >
> > Maybe its more straightforward to open a new PR.
> >
> > Thanks!
> > P
> > On Fri, May 5, 2023 at 9:36 AM Erik van Oosten <e.vanoos...@grons.nl>
> wrote:
> >
> >> If I were to rebase the old pull request and re-open KAFKA-10337, would
> >> it be considered for merging?
> >>
> >> Kind regards,
> >>       Erik.
> >>
> >>
> >> Op 03-05-2023 om 09:21 schreef Erik van Oosten:
> >>> Hi Philip,
> >>>
> >>>> Firstly, could you explain the situation
> >>>> in that you would prefer to invoke commitAsync over commitSync in the
> >>>> rebalance listener?
> >>> Of course!
> >>>
> >>> Short answer: we prefer commitAsync because we want to handle multiple
> >>> partitions concurrently using the ZIO runtime.
> >>>
> >>> Long answer: this is in the context of zio-kafka. In zio-kafka the
> >>> user writes code for a stream that processes data and does commits. In
> >>> the library we intercept those commits and pass them to the
> >>> KafkaConsumer. We also keep track of the offsets of handed out
> >>> records. Together this information allows us to track when a stream is
> >>> ready processing a partition and that it is safe to start the
> rebalance.
> >>>
> >>> All of this happens concurrently and asynchronously using the ZIO
> >>> runtime. When calling commit inside the onPartitionRevoked callback
> >>> the library does a thread-id check; we can only call the KafkaConsumer
> >>> from the same thread that invoked us. This is unfortunate because it
> >>> forces us to spin up a specialized single-threaded ZIO runtime inside
> >>> the callback method. Though this runtime can run blocking methods like
> >>> commitSync, it will need careful programming since all other tasks
> >>> need to wait.
> >>>
> >>> (BTW, it would be great if there is an option to disable the thread-id
> >>> check. It has more use cases, see for example KAFKA-7143.)
> >>>
> >>>> is it your concern that we
> >>>> currently don't have a way to invoke the callback, and the user won't
> >>>> be to
> >>>> correctly handle these failed/successful async commits?
> >>> Yes, that is correct.
> >>>
> >>>> Sorry - I dug a bit into the old PR. Seems like the issue is there's
> >>>> broken
> >>>> contract that if the commitSync won't wait for the previous async
> >>>> commits
> >>>> to complete if it tries to commit an empty offset map.
> >>> Indeed! I am assuming the same is true for commitAsync. The important
> >>> thing is that we need something to get those callbacks. I would prefer
> >>> commitAsync but if only commitSync gets fixed we can use that as well.
> >>> If there is another method completely for this task, that would be
> >>> good as well.
> >>>
> >>> Kind regards,
> >>>      Erik.
> >>>
> >>>
> >>> Philip Nee schreef op 2023-05-02 21:49:
> >>>> Hey Erik,
> >>>>
> >>>> Just a couple of questions to you: Firstly, could you explain the
> >>>> situation
> >>>> in that you would prefer to invoke commitAsync over commitSync in the
> >>>> rebalance listener?  Typically we would use the synchronized method to
> >>>> ensure the commits are completed before moving on with the
> rebalancing,
> >>>> which leads to my second comment/question.  is it your concern that we
> >>>> currently don't have a way to invoke the callback, and the user won't
> >>>> be to
> >>>> correctly handle these failed/successful async commits?
> >>>>
> >>>> Thanks,
> >>>> P
> >>>>
> >>>> On Tue, May 2, 2023 at 12:22 PM Erik van Oosten
> >>>> <e.vanoos...@grons.nl.invalid> wrote:
> >>>>
> >>>>> Dear developers of the Kafka java client,
> >>>>>
> >>>>> It seems I have found a feature gap in the Kafka java client.
> >>>>> KAFKA-10337 and its associated pull request on Github (from 2020!)
> >>>>> would
> >>>>> solve this, but it was closed without merging. We would love to see
> it
> >>>>> being reconsidered for merging. This mail has the arguments for
> >>>>> doing so.
> >>>>>
> >>>>> The javadoc of `ConsumerRebalanceListener` method
> `onPartitionsRevoked`
> >>>>> recommends you commit all offsets within the method, thereby holding
> up
> >>>>> the rebalance until those commits are done. The (perceived) feature
> gap
> >>>>> is when the user is trying to do async commits from the rebalance
> >>>>> listener; there is nothing available to trigger the callbacks of
> >>>>> completed commits. Without these callback, there is no way to know
> when
> >>>>> it is safe to return from onPartitionsRevoked. (We cannot call `poll`
> >>>>> because the rebalance listener is already called from inside a poll.)
> >>>>>
> >>>>> Calling `commitAsync` with an empty offsets parameter seems a perfect
> >>>>> candidate for triggering callbacks of earlier commits. Unfortunately,
> >>>>> commitAsync doesn't behave that way. This is fixed by mentioned pull
> >>>>> request.
> >>>>>
> >>>>> The pull request conversation has a comment saying that calling
> >>>>> `commit`
> >>>>> with an empty offsets parameter is not something that should happen.
> I
> >>>>> found this a strange thing to say. First of all, the method does have
> >>>>> special handling for this situation, negating the comment outright.
> In
> >>>>> addition this special handling violates the contract of the method
> (as
> >>>>> specified in the javadoc section about ordering). Therefore, this
> pull
> >>>>> request has 2 advantages:
> >>>>>
> >>>>>   1. KafkaConsumer.commitAsync will be more in line with its javadoc,
> >>>>>   2. the feature gap is gone.
> >>>>>
> >>>>> Of course, it might be that I missed something and that there are
> other
> >>>>> ways to trigger the commit callbacks. I would be very happy to hear
> >>>>> about that because it means I do not have to wait for a release
> cycle.
> >>>>>
> >>>>> If you agree these arguments are sound, I would be happy to make the
> >>>>> pull request mergable again.
> >>>>>
> >>>>> Curious to your thoughts and kind regards,
> >>>>>       Erik.
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Erik van Oosten
> >>>>> e.vanoos...@grons.nl
> >>>>> https://day-to-day-stuff.blogspot.com
> >>>>> Committer of zio-kafkahttps://github.com/zio/zio-kafka
> >>>>>
> >> --
> >> Erik van Oosten
> >> e.vanoos...@grons.nl
> >> https://day-to-day-stuff.blogspot.com
> >>
> >>
> --
> Erik van Oosten
> e.vanoos...@grons.nl
> https://day-to-day-stuff.blogspot.com
>
>

Reply via email to