Thanks everyone for chiming in here! I'd also prefer the config approach if
compared with API changes.

On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck <bbej...@gmail.com> wrote:

> I meant to chime in earlier.
>
> I also like the `PollOptions` idea, but I have to agree that the config
> option would be the least disruptive approach.
>
> Thanks,
> Bill
>
> On Fri, Feb 5, 2021 at 6:12 PM John Roesler <vvcep...@apache.org> wrote:
>
> > Thanks, all!
> >
> > It seems that the config I proposed is a solution that
> > everyone can be happy with, so I will go ahead with a PR to
> > fix that.
> >
> > I'll update the KIP after a round of PR reviews, in case
> > there are new concerns that arise.
> >
> > Thanks,
> > -John
> >
> > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote:
> > > Thanks for providing more details.
> > >
> > > Adding a config might be the way a least resistance... I am fine with
> > that.
> > >
> > > -Matthias
> > >
> > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
> > > > > vvvvvvvvvvvvvvvvvvv
> > > > > long_poll.mode: return_on_records|return_on_response
> > > >
> > > > This idea LGTM. It not only makes minimum changes to current behavior
> > but also works for KIP-695.
> > > >
> > > > On 2021/02/04 16:07:11, John Roesler <vvcep...@apache.org> wrote:
> > > > > Hi Matthias, Chia-Ping, and Tom,
> > > > >
> > > > > Thanks for the thoughtful replies!
> > > > >
> > > > > Re: poll(~forever~) to block indefinitely on records:
> > > > > Thanks for your dilligence, Chia-Ping. While I wouldn't
> > > > > personally recommend for anyone to write code that blocks
> > > > > forever on I/O, I do agree this is something that "real
> > > > > people" may want to do.
> > > > >
> > > > > Just a note for the record, this approach should only be
> > > > > used in conjunction with a manual assignment. If people are
> > > > > using a group subscription, they're setting themselves up to
> > > > > get kicked out of the group when there is low volume of
> > > > > updates on the topic. And then, when they get kicked out,
> > > > > they will never know it because they're just going to be
> > > > > blocked in `poll()` the whole time.
> > > > >
> > > > > However, if you don't participate in a group and just:
> > > > > 1 assign(partitions)
> > > > > 2 poll(forever),
> > > > > you should indeed expect to return from poll only when you
> > > > > have records.
> > > > >
> > > > > This possibility is the turning point for me. I'd like to
> > > > > alter my proposal to an opt-in config, detailed below.
> > > > >
> > > > > Re: Javadoc:
> > > > > Thanks for pointing that out. It does seem like, if we do
> > > > > decide to change behavior, we should adjust the Javadoc to
> > > > > say so. That was an oversight on my part, and I daresay that
> > > > > if I had done that initially, it would have saved Rajini
> > > > > from having to dig into the code to pinpoint the cause of
> > > > > those test failures.
> > > > >
> > > > > Re: PollOptions:
> > > > > I actually like this option quite a bit. It seems like this
> > > > > would be warranted if we expect someone to want to use the
> > > > > same Consumer instance in both "return on metadata or
> > > > > records" and "return on only records" mode. Otherwise, we
> > > > > might as well introduce a new config.
> > > > >
> > > > > It also seems like the behavior I proposed in this KIP is
> > > > > somewhat "advanced", so I could certainly see leaving it off
> > > > > by default and offering an opt-in config.
> > > > >
> > > > > How does everyone feel about this opt-in config:
> > > > >
> > > > > vvvvvvvvvvvvvvvvvvv
> > > > > long_poll.mode: return_on_records|return_on_response
> > > > >
> > > > > doc:
> > > > > * return_on_records: (default) a call to
> > > > > Consumer#poll(timeout) will block up to the timeout and
> > > > > return early if records are received.
> > > > > * return_on_response: a call to Consumer#poll(timeout) will
> > > > > block up to the timeout and return early if any fetch
> > > > > response is received. Use this option to get updates from
> > > > > Consumer#metadata() even if Consumer#records() is empty.
> > > > > ^^^^^^^^^^^^^^^^^^^
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > > On Thu, 2021-02-04 at 08:44 +0000, Tom Bentley wrote:
> > > > > > Hi,
> > > > > >
> > > > > > The Javadoc for KafkaConsumer#poll() includes the following:
> > > > > >
> > > > > > * This method returns immediately if there are records available.
> > *Otherwise,
> > > > > > > it will await the passed timeout.*
> > > > > > > * If the timeout expires, an empty record set will be returned.
> > Note that
> > > > > > > this method may block beyond the
> > > > > > > * timeout in order to execute custom {@link
> > ConsumerRebalanceListener}
> > > > > > > callbacks.
> > > > > > >
> > > > > >
> > > > > > In other words: If the method returns before the timeout there
> > must be
> > > > > > records in the method result. After the timeout has passed there
> > may be no
> > > > > > records. It might block for longer than the timeout. So I think
> > returning
> > > > > > with empty records before at least the given timeout has passed
> > breaks that
> > > > > > contract.
> > > > > >
> > > > > > A not-much-prettier alternative to adding a new
> > > > > > pollForRecordsOrMetadata(Duration) method could be overloading
> > poll() to
> > > > > > take an additional parameter which controlled whether an early
> > return with
> > > > > > empty records was allowed. Or a `poll(PollOptions)`. In the long
> > run it
> > > > > > could be a mistake to include in the method name exactly what
> > might cause
> > > > > > an early empty return.
> > > > > >
> > > > > > Kind regards,
> > > > > >
> > > > > > Tom
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai <
> chia7...@apache.org>
> > wrote:
> > > > > >
> > > > > > > Thanks for your sharing Matthias. I agree that is indeed an
> > anti-pattern
> > > > > > > to assume poll() returns data or not.
> > > > > > >
> > > > > > > However, I check all usages of poll() in code base. There is an
> > > > > > > interesting use case - poll(a bigger timeout) - it implies that
> > callers
> > > > > > > want to block poll()(forever) unless there are available data.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
> > > > > > > [2]
> > > > > > >
> >
> https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232
> > > > > > >
> > > > > > > Hence, I start to worry client code like aforementioned cases
> > get broken
> > > > > > > due to behavior change :(
> > > > > > >
> > > > > > > On 2021/02/03 22:59:09, "Matthias J. Sax" <mj...@apache.org>
> > wrote:
> > > > > > > > Thanks for your email John.
> > > > > > > >
> > > > > > > > I agree that it seems to be an anti-pattern to write code
> that
> > makes
> > > > > > > > assumptions if poll() returns data or not. Thus, we should
> > fix-forward
> > > > > > > > the system test from my point of view.
> > > > > > > >
> > > > > > > > From my understanding, the impact of KIP-695 is that we might
> > return
> > > > > > > > early from poll() (ie, before the timeout passed) with no
> > data, only if
> > > > > > > > an empty fetch request comes back and there is no other fetch
> > request
> > > > > > > > that did return data. Thus, for most cases, poll() should
> > still return
> > > > > > > > early and provide data. -- Thus, I have no concerns with the
> > slight
> > > > > > > > behavior change.
> > > > > > > >
> > > > > > > > Would be good to get input from others about this question
> > though.
> > > > > > > >
> > > > > > > >
> > > > > > > > -Matthias
> > > > > > > >
> > > > > > > >
> > > > > > > > On 2/3/21 10:06 AM, John Roesler wrote:
> > > > > > > > > Hello again all,
> > > > > > > > >
> > > > > > > > > I'm resurrecting this thread to discuss an issue that has
> > > > > > > > > come up after merging the code for this KIP.
> > > > > > > > >
> > > > > > > > > The issue is that some of the system tests need to be
> > > > > > > > > updated in the same way that this integration test needed
> to
> > > > > > > > > be updated:
> > > > > > > > >
> > > > > > >
> >
> https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888
> > > > > > > > >
> > > > > > > > > This issue was reported here:
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-12268
> > > > > > > > > and there is some preliminary discussion here:
> > > > > > > > > https://github.com/apache/kafka/pull/10022
> > > > > > > > >
> > > > > > > > > First, let me offer my apologies for failing to catch this
> > > > > > > > > before the merge. I'm sorry that it became Rajini's work to
> > > > > > > > > track down the cause of the failure, when it was my
> > > > > > > > > responsibility to ensure the feature was merged safely.
> > > > > > > > >
> > > > > > > > > To recap the situation:
> > > > > > > > > Consumer#poll(Duration) will now return before the duration
> > > > > > > > > expires even if there are no records returned if there is
> > > > > > > > > some returned metadata.
> > > > > > > > >
> > > > > > > > > This behavior was important for KIP-695. In the situation
> > > > > > > > > where we get no records back for some partition, Streams
> > > > > > > > > needs to have the freshest possible information about
> > > > > > > > > whether  there are no new records on the broker, or whether
> > > > > > > > > there are records on the broker that we still need to
> fetch.
> > > > > > > > > If that's not clear, the KIP contains the full story.
> > > > > > > > >
> > > > > > > > > It's definitely a behavior change, but our rationale was
> > > > > > > > > that it's an acceptable behavior change. Our big
> alternative
> > > > > > > > > is to add a _new_ method to Consumer to
> > > > > > > > > pollForRecordsOrMetadata(Duration) or something.
> > > > > > > > >
> > > > > > > > > It seems unreliable to expect the broker to return a
> > > > > > > > > particular record within a particular timeout in general,
> > > > > > > > > which is what these tests are doing. The broker can decide
> > > > > > > > > for several reasons not to return data for a partition, but
> > > > > > > > > return data for another partition instead.
> > > > > > > > >
> > > > > > > > > It seems like the only case where you might reasonably try
> > > > > > > > > to rely on that is in a test, where you first write a
> record
> > > > > > > > > to a partition, then you assign only that one partition to
> a
> > > > > > > > > consumer, then you poll on the consumer, expecting it to
> > > > > > > > > return the data you just wrote.
> > > > > > > > >
> > > > > > > > > So the $10 question here is whether we should support this
> > > > > > > > > apparently artificial (testing-only) use case to the point
> > > > > > > > > where it's worth adding a whole new method to the Consumer
> > > > > > > > > interface.
> > > > > > > > >
> > > > > > > > > Thanks all,
> > > > > > > > > John
> > > > > > > > >
> > > > > > > > > On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote:
> > > > > > > > > > Thanks Jason,
> > > > > > > > > >
> > > > > > > > > > We would only return the metadata for the latest fetches.
> > > > > > > > > > So, if someone wanted to use this to lazily maintain a
> > > > > > > > > > client-side metadata map for all partitions, they'd have
> to
> > > > > > > > > > store it separately and merge in new updates as they
> > arrive.
> > > > > > > > > >
> > > > > > > > > > This way:
> > > > > > > > > > 1. We don't need to increase the complexity of the client
> > by
> > > > > > > > > > storing that metadata
> > > > > > > > > > 2. Users will be able to treat all returned metadata as
> > > > > > > > > > "fresh" without having to reason about the timestamps.
> > > > > > > > > > 3. All parts of the returned ConsumerRecords object have
> > the
> > > > > > > > > > same lifecycle: all the data and metadata are the results
> > of
> > > > > > > > > > the most recent round of fetch responses that had not
> been
> > > > > > > > > > previously polled.
> > > > > > > > > >
> > > > > > > > > > Does that seem sensible to you? I'll update the KIP to
> > > > > > > > > > clarify this.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > > On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote:
> > > > > > > > > > > Hi John,
> > > > > > > > > > >
> > > > > > > > > > > Just one question. It wasn't very clear to me exactly
> > when the
> > > > > > > metadata
> > > > > > > > > > > would be returned in `ConsumerRecords`. Would we
> > /always/ include the
> > > > > > > > > > > metadata for all partitions that are assigned, or would
> > it be based
> > > > > > > on the
> > > > > > > > > > > latest fetches?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Jason
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler <
> > vvcep...@apache.org>
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks, Guozhang!
> > > > > > > > > > > >
> > > > > > > > > > > > All of your feedback sounds good to me. I’ll update
> > the KIP when I
> > > > > > > am able.
> > > > > > > > > > > >
> > > > > > > > > > > > 3) I believe it is the position after the fetch, but
> I
> > will
> > > > > > > confirm. I
> > > > > > > > > > > > think omitting position may render beginning and end
> > offsets
> > > > > > > useless as
> > > > > > > > > > > > well, which leaves only lag. That would be fine with
> > me, but it
> > > > > > > also seems
> > > > > > > > > > > > nice to supply this extra metadata since it is well
> > defined and
> > > > > > > probably
> > > > > > > > > > > > handy for others. Therefore, I’d go the route of
> > specifying the
> > > > > > > exact
> > > > > > > > > > > > semantics and keeping it.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the review,
> > > > > > > > > > > > John
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > > > > > > > > > > Hello John,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the updates! I've made a pass on the KIP
> > and also the
> > > > > > > POC PR,
> > > > > > > > > > > > > here are some minor comments:
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1) nit: "receivedTimestamp" -> it seems the
> metadata
> > keep getting
> > > > > > > > > > > > updated,
> > > > > > > > > > > > > and we do not create a new object but just update
> > the values
> > > > > > > in-place, so
> > > > > > > > > > > > > maybe calling it `lastUpdateTimstamp` is better?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2) It will be great to verify in javadocs that the
> > new API
> > > > > > > > > > > > > "ConsumerRecords#metadata(): Map<TopicPartition,
> > Metadata>" may
> > > > > > > return a
> > > > > > > > > > > > > superset of TopicPartitions than the existing API
> > that returns the
> > > > > > > data
> > > > > > > > > > > > by
> > > > > > > > > > > > > partitions, in case users assume their map
> > key-entries would
> > > > > > > always be
> > > > > > > > > > > > the
> > > > > > > > > > > > > same.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3) The "position()" API of the call needs better
> > clarification: is
> > > > > > > it the
> > > > > > > > > > > > > current position AFTER the records are returned, or
> > is it BEFORE
> > > > > > > the
> > > > > > > > > > > > > records are returned? Personally I'd suggest we do
> > not include it
> > > > > > > if it
> > > > > > > > > > > > is
> > > > > > > > > > > > > not used anywhere yet just to avoid possible
> > misuage, but I'm fine
> > > > > > > if you
> > > > > > > > > > > > > like to keep it still; in that case just clarify
> its
> > semantics.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Other than that,I'm +1 on the KIP as well !
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson <
> > > > > > > wcarl...@confluent.io>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > walker
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna <
> > br...@confluent.io
> > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for the KIP, John!
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > Bruno
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > There hasn't been much discussion on KIP-695
> > so far, so I'd
> > > > > > > > > > > > > > > > like to go ahead and call for a vote.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > As a reminder, the purpose of KIP-695 to
> > improve on the
> > > > > > > > > > > > > > > > "task idling" feature we introduced in
> > KIP-353. This KIP
> > > > > > > > > > > > > > > > will allow Streams to offer deterministic
> time
> > semantics in
> > > > > > > > > > > > > > > > join-type topologies. For example, it makes
> > sure that
> > > > > > > > > > > > > > > > when you join two topics, that we collate the
> > topics by
> > > > > > > > > > > > > > > > timestamp. That was always the intent with
> > task idling (KIP-
> > > > > > > > > > > > > > > > 353), but it turns out the previous mechanism
> > couldn't
> > > > > > > > > > > > > > > > provide the desired semantics.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > The details are here:
> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > --
> > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > >
> > > > >
> > > > >
> >
> >
> >
>


-- 
-- Guozhang

Reply via email to