Thanks for providing more details. Adding a config might be the way a least resistance... I am fine with that.
-Matthias On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: >> vvvvvvvvvvvvvvvvvvv >> long_poll.mode: return_on_records|return_on_response > > This idea LGTM. It not only makes minimum changes to current behavior but > also works for KIP-695. > > On 2021/02/04 16:07:11, John Roesler <vvcep...@apache.org> wrote: >> Hi Matthias, Chia-Ping, and Tom, >> >> Thanks for the thoughtful replies! >> >> Re: poll(~forever~) to block indefinitely on records: >> Thanks for your dilligence, Chia-Ping. While I wouldn't >> personally recommend for anyone to write code that blocks >> forever on I/O, I do agree this is something that "real >> people" may want to do. >> >> Just a note for the record, this approach should only be >> used in conjunction with a manual assignment. If people are >> using a group subscription, they're setting themselves up to >> get kicked out of the group when there is low volume of >> updates on the topic. And then, when they get kicked out, >> they will never know it because they're just going to be >> blocked in `poll()` the whole time. >> >> However, if you don't participate in a group and just: >> 1 assign(partitions) >> 2 poll(forever), >> you should indeed expect to return from poll only when you >> have records. >> >> This possibility is the turning point for me. I'd like to >> alter my proposal to an opt-in config, detailed below. >> >> Re: Javadoc: >> Thanks for pointing that out. It does seem like, if we do >> decide to change behavior, we should adjust the Javadoc to >> say so. That was an oversight on my part, and I daresay that >> if I had done that initially, it would have saved Rajini >> from having to dig into the code to pinpoint the cause of >> those test failures. >> >> Re: PollOptions: >> I actually like this option quite a bit. It seems like this >> would be warranted if we expect someone to want to use the >> same Consumer instance in both "return on metadata or >> records" and "return on only records" mode. Otherwise, we >> might as well introduce a new config. >> >> It also seems like the behavior I proposed in this KIP is >> somewhat "advanced", so I could certainly see leaving it off >> by default and offering an opt-in config. >> >> How does everyone feel about this opt-in config: >> >> vvvvvvvvvvvvvvvvvvv >> long_poll.mode: return_on_records|return_on_response >> >> doc: >> * return_on_records: (default) a call to >> Consumer#poll(timeout) will block up to the timeout and >> return early if records are received. >> * return_on_response: a call to Consumer#poll(timeout) will >> block up to the timeout and return early if any fetch >> response is received. Use this option to get updates from >> Consumer#metadata() even if Consumer#records() is empty. >> ^^^^^^^^^^^^^^^^^^^ >> >> Thanks, >> John >> >> On Thu, 2021-02-04 at 08:44 +0000, Tom Bentley wrote: >>> Hi, >>> >>> The Javadoc for KafkaConsumer#poll() includes the following: >>> >>> * This method returns immediately if there are records available. >>> *Otherwise, >>>> it will await the passed timeout.* >>>> * If the timeout expires, an empty record set will be returned. Note that >>>> this method may block beyond the >>>> * timeout in order to execute custom {@link ConsumerRebalanceListener} >>>> callbacks. >>>> >>> >>> In other words: If the method returns before the timeout there must be >>> records in the method result. After the timeout has passed there may be no >>> records. It might block for longer than the timeout. So I think returning >>> with empty records before at least the given timeout has passed breaks that >>> contract. >>> >>> A not-much-prettier alternative to adding a new >>> pollForRecordsOrMetadata(Duration) method could be overloading poll() to >>> take an additional parameter which controlled whether an early return with >>> empty records was allowed. Or a `poll(PollOptions)`. In the long run it >>> could be a mistake to include in the method name exactly what might cause >>> an early empty return. >>> >>> Kind regards, >>> >>> Tom >>> >>> >>> On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai <chia7...@apache.org> wrote: >>> >>>> Thanks for your sharing Matthias. I agree that is indeed an anti-pattern >>>> to assume poll() returns data or not. >>>> >>>> However, I check all usages of poll() in code base. There is an >>>> interesting use case - poll(a bigger timeout) - it implies that callers >>>> want to block poll()(forever) unless there are available data. >>>> >>>> [1] >>>> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 >>>> [2] >>>> https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 >>>> >>>> Hence, I start to worry client code like aforementioned cases get broken >>>> due to behavior change :( >>>> >>>> On 2021/02/03 22:59:09, "Matthias J. Sax" <mj...@apache.org> wrote: >>>>> Thanks for your email John. >>>>> >>>>> I agree that it seems to be an anti-pattern to write code that makes >>>>> assumptions if poll() returns data or not. Thus, we should fix-forward >>>>> the system test from my point of view. >>>>> >>>>> From my understanding, the impact of KIP-695 is that we might return >>>>> early from poll() (ie, before the timeout passed) with no data, only if >>>>> an empty fetch request comes back and there is no other fetch request >>>>> that did return data. Thus, for most cases, poll() should still return >>>>> early and provide data. -- Thus, I have no concerns with the slight >>>>> behavior change. >>>>> >>>>> Would be good to get input from others about this question though. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 2/3/21 10:06 AM, John Roesler wrote: >>>>>> Hello again all, >>>>>> >>>>>> I'm resurrecting this thread to discuss an issue that has >>>>>> come up after merging the code for this KIP. >>>>>> >>>>>> The issue is that some of the system tests need to be >>>>>> updated in the same way that this integration test needed to >>>>>> be updated: >>>>>> >>>> https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 >>>>>> >>>>>> This issue was reported here: >>>>>> https://issues.apache.org/jira/browse/KAFKA-12268 >>>>>> and there is some preliminary discussion here: >>>>>> https://github.com/apache/kafka/pull/10022 >>>>>> >>>>>> First, let me offer my apologies for failing to catch this >>>>>> before the merge. I'm sorry that it became Rajini's work to >>>>>> track down the cause of the failure, when it was my >>>>>> responsibility to ensure the feature was merged safely. >>>>>> >>>>>> To recap the situation: >>>>>> Consumer#poll(Duration) will now return before the duration >>>>>> expires even if there are no records returned if there is >>>>>> some returned metadata. >>>>>> >>>>>> This behavior was important for KIP-695. In the situation >>>>>> where we get no records back for some partition, Streams >>>>>> needs to have the freshest possible information about >>>>>> whether there are no new records on the broker, or whether >>>>>> there are records on the broker that we still need to fetch. >>>>>> If that's not clear, the KIP contains the full story. >>>>>> >>>>>> It's definitely a behavior change, but our rationale was >>>>>> that it's an acceptable behavior change. Our big alternative >>>>>> is to add a _new_ method to Consumer to >>>>>> pollForRecordsOrMetadata(Duration) or something. >>>>>> >>>>>> It seems unreliable to expect the broker to return a >>>>>> particular record within a particular timeout in general, >>>>>> which is what these tests are doing. The broker can decide >>>>>> for several reasons not to return data for a partition, but >>>>>> return data for another partition instead. >>>>>> >>>>>> It seems like the only case where you might reasonably try >>>>>> to rely on that is in a test, where you first write a record >>>>>> to a partition, then you assign only that one partition to a >>>>>> consumer, then you poll on the consumer, expecting it to >>>>>> return the data you just wrote. >>>>>> >>>>>> So the $10 question here is whether we should support this >>>>>> apparently artificial (testing-only) use case to the point >>>>>> where it's worth adding a whole new method to the Consumer >>>>>> interface. >>>>>> >>>>>> Thanks all, >>>>>> John >>>>>> >>>>>> On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote: >>>>>>> Thanks Jason, >>>>>>> >>>>>>> We would only return the metadata for the latest fetches. >>>>>>> So, if someone wanted to use this to lazily maintain a >>>>>>> client-side metadata map for all partitions, they'd have to >>>>>>> store it separately and merge in new updates as they arrive. >>>>>>> >>>>>>> This way: >>>>>>> 1. We don't need to increase the complexity of the client by >>>>>>> storing that metadata >>>>>>> 2. Users will be able to treat all returned metadata as >>>>>>> "fresh" without having to reason about the timestamps. >>>>>>> 3. All parts of the returned ConsumerRecords object have the >>>>>>> same lifecycle: all the data and metadata are the results of >>>>>>> the most recent round of fetch responses that had not been >>>>>>> previously polled. >>>>>>> >>>>>>> Does that seem sensible to you? I'll update the KIP to >>>>>>> clarify this. >>>>>>> >>>>>>> Thanks, >>>>>>> -John >>>>>>> >>>>>>> On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote: >>>>>>>> Hi John, >>>>>>>> >>>>>>>> Just one question. It wasn't very clear to me exactly when the >>>> metadata >>>>>>>> would be returned in `ConsumerRecords`. Would we /always/ include the >>>>>>>> metadata for all partitions that are assigned, or would it be based >>>> on the >>>>>>>> latest fetches? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Jason >>>>>>>> >>>>>>>> On Fri, Dec 11, 2020 at 4:07 PM John Roesler <vvcep...@apache.org> >>>> wrote: >>>>>>>> >>>>>>>>> Thanks, Guozhang! >>>>>>>>> >>>>>>>>> All of your feedback sounds good to me. I’ll update the KIP when I >>>> am able. >>>>>>>>> >>>>>>>>> 3) I believe it is the position after the fetch, but I will >>>> confirm. I >>>>>>>>> think omitting position may render beginning and end offsets >>>> useless as >>>>>>>>> well, which leaves only lag. That would be fine with me, but it >>>> also seems >>>>>>>>> nice to supply this extra metadata since it is well defined and >>>> probably >>>>>>>>> handy for others. Therefore, I’d go the route of specifying the >>>> exact >>>>>>>>> semantics and keeping it. >>>>>>>>> >>>>>>>>> Thanks for the review, >>>>>>>>> John >>>>>>>>> >>>>>>>>> On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: >>>>>>>>>> Hello John, >>>>>>>>>> >>>>>>>>>> Thanks for the updates! I've made a pass on the KIP and also the >>>> POC PR, >>>>>>>>>> here are some minor comments: >>>>>>>>>> >>>>>>>>>> 1) nit: "receivedTimestamp" -> it seems the metadata keep getting >>>>>>>>> updated, >>>>>>>>>> and we do not create a new object but just update the values >>>> in-place, so >>>>>>>>>> maybe calling it `lastUpdateTimstamp` is better? >>>>>>>>>> >>>>>>>>>> 2) It will be great to verify in javadocs that the new API >>>>>>>>>> "ConsumerRecords#metadata(): Map<TopicPartition, Metadata>" may >>>> return a >>>>>>>>>> superset of TopicPartitions than the existing API that returns the >>>> data >>>>>>>>> by >>>>>>>>>> partitions, in case users assume their map key-entries would >>>> always be >>>>>>>>> the >>>>>>>>>> same. >>>>>>>>>> >>>>>>>>>> 3) The "position()" API of the call needs better clarification: is >>>> it the >>>>>>>>>> current position AFTER the records are returned, or is it BEFORE >>>> the >>>>>>>>>> records are returned? Personally I'd suggest we do not include it >>>> if it >>>>>>>>> is >>>>>>>>>> not used anywhere yet just to avoid possible misuage, but I'm fine >>>> if you >>>>>>>>>> like to keep it still; in that case just clarify its semantics. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Other than that,I'm +1 on the KIP as well ! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson < >>>> wcarl...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for the KIP! >>>>>>>>>>> >>>>>>>>>>> +1 (non-binding) >>>>>>>>>>> >>>>>>>>>>> walker >>>>>>>>>>> >>>>>>>>>>> On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna <br...@confluent.io >>>>> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP, John! >>>>>>>>>>>> >>>>>>>>>>>> +1 (non-binding) >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Bruno >>>>>>>>>>>> >>>>>>>>>>>> On 08.12.20 18:03, John Roesler wrote: >>>>>>>>>>>>> Hello all, >>>>>>>>>>>>> >>>>>>>>>>>>> There hasn't been much discussion on KIP-695 so far, so I'd >>>>>>>>>>>>> like to go ahead and call for a vote. >>>>>>>>>>>>> >>>>>>>>>>>>> As a reminder, the purpose of KIP-695 to improve on the >>>>>>>>>>>>> "task idling" feature we introduced in KIP-353. This KIP >>>>>>>>>>>>> will allow Streams to offer deterministic time semantics in >>>>>>>>>>>>> join-type topologies. For example, it makes sure that >>>>>>>>>>>>> when you join two topics, that we collate the topics by >>>>>>>>>>>>> timestamp. That was always the intent with task idling (KIP- >>>>>>>>>>>>> 353), but it turns out the previous mechanism couldn't >>>>>>>>>>>>> provide the desired semantics. >>>>>>>>>>>>> >>>>>>>>>>>>> The details are here: >>>>>>>>>>>>> https://cwiki.apache.org/confluence/x/JSXZCQ >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >> >> >>