Thank you! Will take a look at the PR. On Fri, May 5, 2023 at 11:56 PM Erik van Oosten <e.vanoos...@grons.nl.invalid> wrote:
> Thanks! > > Here is Tom Lee's PR recreated on trunk: > https://github.com/apache/kafka/pull/13678 > > I believe that this PR might not be complete though. When we only call > commitAsync (repeatedly) from the rebalance listener callback method. > Will the client's poll method ever be invoked? I suspect that no polling > takes place in this scenario and that async commits will still not > complete. With the changes of this PR, commitSync can be used as a > workaround. > > I guess we can fix this by adding `client.pollNoWakeup()`, e.g. at the > start of `ConsumerCoordinator.commitOffsetsAsync`. Is that an acceptable > change? > > Kind regards, > Erik. > > > Op 05-05-2023 om 20:20 schreef Philip Nee: > > Hey Eric, > > > > Maybe its more straightforward to open a new PR. > > > > Thanks! > > P > > On Fri, May 5, 2023 at 9:36 AM Erik van Oosten <e.vanoos...@grons.nl> > wrote: > > > >> If I were to rebase the old pull request and re-open KAFKA-10337, would > >> it be considered for merging? > >> > >> Kind regards, > >> Erik. > >> > >> > >> Op 03-05-2023 om 09:21 schreef Erik van Oosten: > >>> Hi Philip, > >>> > >>>> Firstly, could you explain the situation > >>>> in that you would prefer to invoke commitAsync over commitSync in the > >>>> rebalance listener? > >>> Of course! > >>> > >>> Short answer: we prefer commitAsync because we want to handle multiple > >>> partitions concurrently using the ZIO runtime. > >>> > >>> Long answer: this is in the context of zio-kafka. In zio-kafka the > >>> user writes code for a stream that processes data and does commits. In > >>> the library we intercept those commits and pass them to the > >>> KafkaConsumer. We also keep track of the offsets of handed out > >>> records. Together this information allows us to track when a stream is > >>> ready processing a partition and that it is safe to start the > rebalance. > >>> > >>> All of this happens concurrently and asynchronously using the ZIO > >>> runtime. When calling commit inside the onPartitionRevoked callback > >>> the library does a thread-id check; we can only call the KafkaConsumer > >>> from the same thread that invoked us. This is unfortunate because it > >>> forces us to spin up a specialized single-threaded ZIO runtime inside > >>> the callback method. Though this runtime can run blocking methods like > >>> commitSync, it will need careful programming since all other tasks > >>> need to wait. > >>> > >>> (BTW, it would be great if there is an option to disable the thread-id > >>> check. It has more use cases, see for example KAFKA-7143.) > >>> > >>>> is it your concern that we > >>>> currently don't have a way to invoke the callback, and the user won't > >>>> be to > >>>> correctly handle these failed/successful async commits? > >>> Yes, that is correct. > >>> > >>>> Sorry - I dug a bit into the old PR. Seems like the issue is there's > >>>> broken > >>>> contract that if the commitSync won't wait for the previous async > >>>> commits > >>>> to complete if it tries to commit an empty offset map. > >>> Indeed! I am assuming the same is true for commitAsync. The important > >>> thing is that we need something to get those callbacks. I would prefer > >>> commitAsync but if only commitSync gets fixed we can use that as well. > >>> If there is another method completely for this task, that would be > >>> good as well. > >>> > >>> Kind regards, > >>> Erik. > >>> > >>> > >>> Philip Nee schreef op 2023-05-02 21:49: > >>>> Hey Erik, > >>>> > >>>> Just a couple of questions to you: Firstly, could you explain the > >>>> situation > >>>> in that you would prefer to invoke commitAsync over commitSync in the > >>>> rebalance listener? Typically we would use the synchronized method to > >>>> ensure the commits are completed before moving on with the > rebalancing, > >>>> which leads to my second comment/question. is it your concern that we > >>>> currently don't have a way to invoke the callback, and the user won't > >>>> be to > >>>> correctly handle these failed/successful async commits? > >>>> > >>>> Thanks, > >>>> P > >>>> > >>>> On Tue, May 2, 2023 at 12:22 PM Erik van Oosten > >>>> <e.vanoos...@grons.nl.invalid> wrote: > >>>> > >>>>> Dear developers of the Kafka java client, > >>>>> > >>>>> It seems I have found a feature gap in the Kafka java client. > >>>>> KAFKA-10337 and its associated pull request on Github (from 2020!) > >>>>> would > >>>>> solve this, but it was closed without merging. We would love to see > it > >>>>> being reconsidered for merging. This mail has the arguments for > >>>>> doing so. > >>>>> > >>>>> The javadoc of `ConsumerRebalanceListener` method > `onPartitionsRevoked` > >>>>> recommends you commit all offsets within the method, thereby holding > up > >>>>> the rebalance until those commits are done. The (perceived) feature > gap > >>>>> is when the user is trying to do async commits from the rebalance > >>>>> listener; there is nothing available to trigger the callbacks of > >>>>> completed commits. Without these callback, there is no way to know > when > >>>>> it is safe to return from onPartitionsRevoked. (We cannot call `poll` > >>>>> because the rebalance listener is already called from inside a poll.) > >>>>> > >>>>> Calling `commitAsync` with an empty offsets parameter seems a perfect > >>>>> candidate for triggering callbacks of earlier commits. Unfortunately, > >>>>> commitAsync doesn't behave that way. This is fixed by mentioned pull > >>>>> request. > >>>>> > >>>>> The pull request conversation has a comment saying that calling > >>>>> `commit` > >>>>> with an empty offsets parameter is not something that should happen. > I > >>>>> found this a strange thing to say. First of all, the method does have > >>>>> special handling for this situation, negating the comment outright. > In > >>>>> addition this special handling violates the contract of the method > (as > >>>>> specified in the javadoc section about ordering). Therefore, this > pull > >>>>> request has 2 advantages: > >>>>> > >>>>> 1. KafkaConsumer.commitAsync will be more in line with its javadoc, > >>>>> 2. the feature gap is gone. > >>>>> > >>>>> Of course, it might be that I missed something and that there are > other > >>>>> ways to trigger the commit callbacks. I would be very happy to hear > >>>>> about that because it means I do not have to wait for a release > cycle. > >>>>> > >>>>> If you agree these arguments are sound, I would be happy to make the > >>>>> pull request mergable again. > >>>>> > >>>>> Curious to your thoughts and kind regards, > >>>>> Erik. > >>>>> > >>>>> > >>>>> -- > >>>>> Erik van Oosten > >>>>> e.vanoos...@grons.nl > >>>>> https://day-to-day-stuff.blogspot.com > >>>>> Committer of zio-kafkahttps://github.com/zio/zio-kafka > >>>>> > >> -- > >> Erik van Oosten > >> e.vanoos...@grons.nl > >> https://day-to-day-stuff.blogspot.com > >> > >> > -- > Erik van Oosten > e.vanoos...@grons.nl > https://day-to-day-stuff.blogspot.com > >