> vvvvvvvvvvvvvvvvvvv > long_poll.mode: return_on_records|return_on_response
This idea LGTM. It not only makes minimum changes to current behavior but also works for KIP-695. On 2021/02/04 16:07:11, John Roesler <vvcep...@apache.org> wrote: > Hi Matthias, Chia-Ping, and Tom, > > Thanks for the thoughtful replies! > > Re: poll(~forever~) to block indefinitely on records: > Thanks for your dilligence, Chia-Ping. While I wouldn't > personally recommend for anyone to write code that blocks > forever on I/O, I do agree this is something that "real > people" may want to do. > > Just a note for the record, this approach should only be > used in conjunction with a manual assignment. If people are > using a group subscription, they're setting themselves up to > get kicked out of the group when there is low volume of > updates on the topic. And then, when they get kicked out, > they will never know it because they're just going to be > blocked in `poll()` the whole time. > > However, if you don't participate in a group and just: > 1 assign(partitions) > 2 poll(forever), > you should indeed expect to return from poll only when you > have records. > > This possibility is the turning point for me. I'd like to > alter my proposal to an opt-in config, detailed below. > > Re: Javadoc: > Thanks for pointing that out. It does seem like, if we do > decide to change behavior, we should adjust the Javadoc to > say so. That was an oversight on my part, and I daresay that > if I had done that initially, it would have saved Rajini > from having to dig into the code to pinpoint the cause of > those test failures. > > Re: PollOptions: > I actually like this option quite a bit. It seems like this > would be warranted if we expect someone to want to use the > same Consumer instance in both "return on metadata or > records" and "return on only records" mode. Otherwise, we > might as well introduce a new config. > > It also seems like the behavior I proposed in this KIP is > somewhat "advanced", so I could certainly see leaving it off > by default and offering an opt-in config. > > How does everyone feel about this opt-in config: > > vvvvvvvvvvvvvvvvvvv > long_poll.mode: return_on_records|return_on_response > > doc: > * return_on_records: (default) a call to > Consumer#poll(timeout) will block up to the timeout and > return early if records are received. > * return_on_response: a call to Consumer#poll(timeout) will > block up to the timeout and return early if any fetch > response is received. Use this option to get updates from > Consumer#metadata() even if Consumer#records() is empty. > ^^^^^^^^^^^^^^^^^^^ > > Thanks, > John > > On Thu, 2021-02-04 at 08:44 +0000, Tom Bentley wrote: > > Hi, > > > > The Javadoc for KafkaConsumer#poll() includes the following: > > > > * This method returns immediately if there are records available. > > *Otherwise, > > > it will await the passed timeout.* > > > * If the timeout expires, an empty record set will be returned. Note that > > > this method may block beyond the > > > * timeout in order to execute custom {@link ConsumerRebalanceListener} > > > callbacks. > > > > > > > In other words: If the method returns before the timeout there must be > > records in the method result. After the timeout has passed there may be no > > records. It might block for longer than the timeout. So I think returning > > with empty records before at least the given timeout has passed breaks that > > contract. > > > > A not-much-prettier alternative to adding a new > > pollForRecordsOrMetadata(Duration) method could be overloading poll() to > > take an additional parameter which controlled whether an early return with > > empty records was allowed. Or a `poll(PollOptions)`. In the long run it > > could be a mistake to include in the method name exactly what might cause > > an early empty return. > > > > Kind regards, > > > > Tom > > > > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai <chia7...@apache.org> wrote: > > > > > Thanks for your sharing Matthias. I agree that is indeed an anti-pattern > > > to assume poll() returns data or not. > > > > > > However, I check all usages of poll() in code base. There is an > > > interesting use case - poll(a bigger timeout) - it implies that callers > > > want to block poll()(forever) unless there are available data. > > > > > > [1] > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 > > > [2] > > > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 > > > > > > Hence, I start to worry client code like aforementioned cases get broken > > > due to behavior change :( > > > > > > On 2021/02/03 22:59:09, "Matthias J. Sax" <mj...@apache.org> wrote: > > > > Thanks for your email John. > > > > > > > > I agree that it seems to be an anti-pattern to write code that makes > > > > assumptions if poll() returns data or not. Thus, we should fix-forward > > > > the system test from my point of view. > > > > > > > > From my understanding, the impact of KIP-695 is that we might return > > > > early from poll() (ie, before the timeout passed) with no data, only if > > > > an empty fetch request comes back and there is no other fetch request > > > > that did return data. Thus, for most cases, poll() should still return > > > > early and provide data. -- Thus, I have no concerns with the slight > > > > behavior change. > > > > > > > > Would be good to get input from others about this question though. > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 2/3/21 10:06 AM, John Roesler wrote: > > > > > Hello again all, > > > > > > > > > > I'm resurrecting this thread to discuss an issue that has > > > > > come up after merging the code for this KIP. > > > > > > > > > > The issue is that some of the system tests need to be > > > > > updated in the same way that this integration test needed to > > > > > be updated: > > > > > > > > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 > > > > > > > > > > This issue was reported here: > > > > > https://issues.apache.org/jira/browse/KAFKA-12268 > > > > > and there is some preliminary discussion here: > > > > > https://github.com/apache/kafka/pull/10022 > > > > > > > > > > First, let me offer my apologies for failing to catch this > > > > > before the merge. I'm sorry that it became Rajini's work to > > > > > track down the cause of the failure, when it was my > > > > > responsibility to ensure the feature was merged safely. > > > > > > > > > > To recap the situation: > > > > > Consumer#poll(Duration) will now return before the duration > > > > > expires even if there are no records returned if there is > > > > > some returned metadata. > > > > > > > > > > This behavior was important for KIP-695. In the situation > > > > > where we get no records back for some partition, Streams > > > > > needs to have the freshest possible information about > > > > > whether there are no new records on the broker, or whether > > > > > there are records on the broker that we still need to fetch. > > > > > If that's not clear, the KIP contains the full story. > > > > > > > > > > It's definitely a behavior change, but our rationale was > > > > > that it's an acceptable behavior change. Our big alternative > > > > > is to add a _new_ method to Consumer to > > > > > pollForRecordsOrMetadata(Duration) or something. > > > > > > > > > > It seems unreliable to expect the broker to return a > > > > > particular record within a particular timeout in general, > > > > > which is what these tests are doing. The broker can decide > > > > > for several reasons not to return data for a partition, but > > > > > return data for another partition instead. > > > > > > > > > > It seems like the only case where you might reasonably try > > > > > to rely on that is in a test, where you first write a record > > > > > to a partition, then you assign only that one partition to a > > > > > consumer, then you poll on the consumer, expecting it to > > > > > return the data you just wrote. > > > > > > > > > > So the $10 question here is whether we should support this > > > > > apparently artificial (testing-only) use case to the point > > > > > where it's worth adding a whole new method to the Consumer > > > > > interface. > > > > > > > > > > Thanks all, > > > > > John > > > > > > > > > > On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote: > > > > > > Thanks Jason, > > > > > > > > > > > > We would only return the metadata for the latest fetches. > > > > > > So, if someone wanted to use this to lazily maintain a > > > > > > client-side metadata map for all partitions, they'd have to > > > > > > store it separately and merge in new updates as they arrive. > > > > > > > > > > > > This way: > > > > > > 1. We don't need to increase the complexity of the client by > > > > > > storing that metadata > > > > > > 2. Users will be able to treat all returned metadata as > > > > > > "fresh" without having to reason about the timestamps. > > > > > > 3. All parts of the returned ConsumerRecords object have the > > > > > > same lifecycle: all the data and metadata are the results of > > > > > > the most recent round of fetch responses that had not been > > > > > > previously polled. > > > > > > > > > > > > Does that seem sensible to you? I'll update the KIP to > > > > > > clarify this. > > > > > > > > > > > > Thanks, > > > > > > -John > > > > > > > > > > > > On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote: > > > > > > > Hi John, > > > > > > > > > > > > > > Just one question. It wasn't very clear to me exactly when the > > > metadata > > > > > > > would be returned in `ConsumerRecords`. Would we /always/ include > > > > > > > the > > > > > > > metadata for all partitions that are assigned, or would it be > > > > > > > based > > > on the > > > > > > > latest fetches? > > > > > > > > > > > > > > Thanks, > > > > > > > Jason > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > > > > > > > > > > > Thanks, Guozhang! > > > > > > > > > > > > > > > > All of your feedback sounds good to me. I’ll update the KIP > > > > > > > > when I > > > am able. > > > > > > > > > > > > > > > > 3) I believe it is the position after the fetch, but I will > > > confirm. I > > > > > > > > think omitting position may render beginning and end offsets > > > useless as > > > > > > > > well, which leaves only lag. That would be fine with me, but it > > > also seems > > > > > > > > nice to supply this extra metadata since it is well defined and > > > probably > > > > > > > > handy for others. Therefore, I’d go the route of specifying the > > > exact > > > > > > > > semantics and keeping it. > > > > > > > > > > > > > > > > Thanks for the review, > > > > > > > > John > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > > > > > > > > Hello John, > > > > > > > > > > > > > > > > > > Thanks for the updates! I've made a pass on the KIP and also > > > > > > > > > the > > > POC PR, > > > > > > > > > here are some minor comments: > > > > > > > > > > > > > > > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep > > > > > > > > > getting > > > > > > > > updated, > > > > > > > > > and we do not create a new object but just update the values > > > in-place, so > > > > > > > > > maybe calling it `lastUpdateTimstamp` is better? > > > > > > > > > > > > > > > > > > 2) It will be great to verify in javadocs that the new API > > > > > > > > > "ConsumerRecords#metadata(): Map<TopicPartition, Metadata>" > > > > > > > > > may > > > return a > > > > > > > > > superset of TopicPartitions than the existing API that > > > > > > > > > returns the > > > data > > > > > > > > by > > > > > > > > > partitions, in case users assume their map key-entries would > > > always be > > > > > > > > the > > > > > > > > > same. > > > > > > > > > > > > > > > > > > 3) The "position()" API of the call needs better > > > > > > > > > clarification: is > > > it the > > > > > > > > > current position AFTER the records are returned, or is it > > > > > > > > > BEFORE > > > the > > > > > > > > > records are returned? Personally I'd suggest we do not > > > > > > > > > include it > > > if it > > > > > > > > is > > > > > > > > > not used anywhere yet just to avoid possible misuage, but I'm > > > > > > > > > fine > > > if you > > > > > > > > > like to keep it still; in that case just clarify its > > > > > > > > > semantics. > > > > > > > > > > > > > > > > > > > > > > > > > > > Other than that,I'm +1 on the KIP as well ! > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson < > > > wcarl...@confluent.io> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > walker > > > > > > > > > > > > > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna > > > > > > > > > > <br...@confluent.io > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP, John! > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Bruno > > > > > > > > > > > > > > > > > > > > > > On 08.12.20 18:03, John Roesler wrote: > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > > > There hasn't been much discussion on KIP-695 so far, so > > > > > > > > > > > > I'd > > > > > > > > > > > > like to go ahead and call for a vote. > > > > > > > > > > > > > > > > > > > > > > > > As a reminder, the purpose of KIP-695 to improve on the > > > > > > > > > > > > "task idling" feature we introduced in KIP-353. This KIP > > > > > > > > > > > > will allow Streams to offer deterministic time > > > > > > > > > > > > semantics in > > > > > > > > > > > > join-type topologies. For example, it makes sure that > > > > > > > > > > > > when you join two topics, that we collate the topics by > > > > > > > > > > > > timestamp. That was always the intent with task idling > > > > > > > > > > > > (KIP- > > > > > > > > > > > > 353), but it turns out the previous mechanism couldn't > > > > > > > > > > > > provide the desired semantics. > > > > > > > > > > > > > > > > > > > > > > > > The details are here: > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >