Thanks for providing more details.

Adding a config might be the way a least resistance... I am fine with that.

-Matthias

On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
>> vvvvvvvvvvvvvvvvvvv
>> long_poll.mode: return_on_records|return_on_response
> 
> This idea LGTM. It not only makes minimum changes to current behavior but 
> also works for KIP-695. 
> 
> On 2021/02/04 16:07:11, John Roesler <vvcep...@apache.org> wrote: 
>> Hi Matthias, Chia-Ping, and Tom,
>>
>> Thanks for the thoughtful replies!
>>
>> Re: poll(~forever~) to block indefinitely on records:
>> Thanks for your dilligence, Chia-Ping. While I wouldn't
>> personally recommend for anyone to write code that blocks
>> forever on I/O, I do agree this is something that "real
>> people" may want to do.
>>
>> Just a note for the record, this approach should only be
>> used in conjunction with a manual assignment. If people are
>> using a group subscription, they're setting themselves up to
>> get kicked out of the group when there is low volume of
>> updates on the topic. And then, when they get kicked out,
>> they will never know it because they're just going to be
>> blocked in `poll()` the whole time.
>>
>> However, if you don't participate in a group and just:
>> 1 assign(partitions)
>> 2 poll(forever),
>> you should indeed expect to return from poll only when you
>> have records.
>>
>> This possibility is the turning point for me. I'd like to
>> alter my proposal to an opt-in config, detailed below.
>>
>> Re: Javadoc:
>> Thanks for pointing that out. It does seem like, if we do
>> decide to change behavior, we should adjust the Javadoc to
>> say so. That was an oversight on my part, and I daresay that
>> if I had done that initially, it would have saved Rajini
>> from having to dig into the code to pinpoint the cause of
>> those test failures.
>>
>> Re: PollOptions:
>> I actually like this option quite a bit. It seems like this
>> would be warranted if we expect someone to want to use the
>> same Consumer instance in both "return on metadata or
>> records" and "return on only records" mode. Otherwise, we
>> might as well introduce a new config.
>>
>> It also seems like the behavior I proposed in this KIP is
>> somewhat "advanced", so I could certainly see leaving it off
>> by default and offering an opt-in config.
>>
>> How does everyone feel about this opt-in config:
>>
>> vvvvvvvvvvvvvvvvvvv
>> long_poll.mode: return_on_records|return_on_response
>>
>> doc:
>> * return_on_records: (default) a call to
>> Consumer#poll(timeout) will block up to the timeout and
>> return early if records are received.
>> * return_on_response: a call to Consumer#poll(timeout) will
>> block up to the timeout and return early if any fetch
>> response is received. Use this option to get updates from
>> Consumer#metadata() even if Consumer#records() is empty.
>> ^^^^^^^^^^^^^^^^^^^
>>
>> Thanks,
>> John
>>
>> On Thu, 2021-02-04 at 08:44 +0000, Tom Bentley wrote:
>>> Hi,
>>>
>>> The Javadoc for KafkaConsumer#poll() includes the following:
>>>
>>> * This method returns immediately if there are records available. 
>>> *Otherwise,
>>>> it will await the passed timeout.*
>>>> * If the timeout expires, an empty record set will be returned. Note that
>>>> this method may block beyond the
>>>> * timeout in order to execute custom {@link ConsumerRebalanceListener}
>>>> callbacks.
>>>>
>>>
>>> In other words: If the method returns before the timeout there must be
>>> records in the method result. After the timeout has passed there may be no
>>> records. It might block for longer than the timeout. So I think returning
>>> with empty records before at least the given timeout has passed breaks that
>>> contract.
>>>
>>> A not-much-prettier alternative to adding a new
>>> pollForRecordsOrMetadata(Duration) method could be overloading poll() to
>>> take an additional parameter which controlled whether an early return with
>>> empty records was allowed. Or a `poll(PollOptions)`. In the long run it
>>> could be a mistake to include in the method name exactly what might cause
>>> an early empty return.
>>>
>>> Kind regards,
>>>
>>> Tom
>>>
>>>
>>> On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai <chia7...@apache.org> wrote:
>>>
>>>> Thanks for your sharing Matthias. I agree that is indeed an anti-pattern
>>>> to assume poll() returns data or not.
>>>>
>>>> However, I check all usages of poll() in code base. There is an
>>>> interesting use case - poll(a bigger timeout) - it implies that callers
>>>> want to block poll()(forever) unless there are available data.
>>>>
>>>> [1]
>>>> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
>>>> [2]
>>>> https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232
>>>>
>>>> Hence, I start to worry client code like aforementioned cases get broken
>>>> due to behavior change :(
>>>>
>>>> On 2021/02/03 22:59:09, "Matthias J. Sax" <mj...@apache.org> wrote:
>>>>> Thanks for your email John.
>>>>>
>>>>> I agree that it seems to be an anti-pattern to write code that makes
>>>>> assumptions if poll() returns data or not. Thus, we should fix-forward
>>>>> the system test from my point of view.
>>>>>
>>>>> From my understanding, the impact of KIP-695 is that we might return
>>>>> early from poll() (ie, before the timeout passed) with no data, only if
>>>>> an empty fetch request comes back and there is no other fetch request
>>>>> that did return data. Thus, for most cases, poll() should still return
>>>>> early and provide data. -- Thus, I have no concerns with the slight
>>>>> behavior change.
>>>>>
>>>>> Would be good to get input from others about this question though.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 2/3/21 10:06 AM, John Roesler wrote:
>>>>>> Hello again all,
>>>>>>
>>>>>> I'm resurrecting this thread to discuss an issue that has
>>>>>> come up after merging the code for this KIP.
>>>>>>
>>>>>> The issue is that some of the system tests need to be
>>>>>> updated in the same way that this integration test needed to
>>>>>> be updated:
>>>>>>
>>>> https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888
>>>>>>
>>>>>> This issue was reported here:
>>>>>> https://issues.apache.org/jira/browse/KAFKA-12268
>>>>>> and there is some preliminary discussion here:
>>>>>> https://github.com/apache/kafka/pull/10022
>>>>>>
>>>>>> First, let me offer my apologies for failing to catch this
>>>>>> before the merge. I'm sorry that it became Rajini's work to
>>>>>> track down the cause of the failure, when it was my
>>>>>> responsibility to ensure the feature was merged safely.
>>>>>>
>>>>>> To recap the situation:
>>>>>> Consumer#poll(Duration) will now return before the duration
>>>>>> expires even if there are no records returned if there is
>>>>>> some returned metadata.
>>>>>>
>>>>>> This behavior was important for KIP-695. In the situation
>>>>>> where we get no records back for some partition, Streams
>>>>>> needs to have the freshest possible information about
>>>>>> whether  there are no new records on the broker, or whether
>>>>>> there are records on the broker that we still need to fetch.
>>>>>> If that's not clear, the KIP contains the full story.
>>>>>>
>>>>>> It's definitely a behavior change, but our rationale was
>>>>>> that it's an acceptable behavior change. Our big alternative
>>>>>> is to add a _new_ method to Consumer to
>>>>>> pollForRecordsOrMetadata(Duration) or something.
>>>>>>
>>>>>> It seems unreliable to expect the broker to return a
>>>>>> particular record within a particular timeout in general,
>>>>>> which is what these tests are doing. The broker can decide
>>>>>> for several reasons not to return data for a partition, but
>>>>>> return data for another partition instead.
>>>>>>
>>>>>> It seems like the only case where you might reasonably try
>>>>>> to rely on that is in a test, where you first write a record
>>>>>> to a partition, then you assign only that one partition to a
>>>>>> consumer, then you poll on the consumer, expecting it to
>>>>>> return the data you just wrote.
>>>>>>
>>>>>> So the $10 question here is whether we should support this
>>>>>> apparently artificial (testing-only) use case to the point
>>>>>> where it's worth adding a whole new method to the Consumer
>>>>>> interface.
>>>>>>
>>>>>> Thanks all,
>>>>>> John
>>>>>>
>>>>>> On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote:
>>>>>>> Thanks Jason,
>>>>>>>
>>>>>>> We would only return the metadata for the latest fetches.
>>>>>>> So, if someone wanted to use this to lazily maintain a
>>>>>>> client-side metadata map for all partitions, they'd have to
>>>>>>> store it separately and merge in new updates as they arrive.
>>>>>>>
>>>>>>> This way:
>>>>>>> 1. We don't need to increase the complexity of the client by
>>>>>>> storing that metadata
>>>>>>> 2. Users will be able to treat all returned metadata as
>>>>>>> "fresh" without having to reason about the timestamps.
>>>>>>> 3. All parts of the returned ConsumerRecords object have the
>>>>>>> same lifecycle: all the data and metadata are the results of
>>>>>>> the most recent round of fetch responses that had not been
>>>>>>> previously polled.
>>>>>>>
>>>>>>> Does that seem sensible to you? I'll update the KIP to
>>>>>>> clarify this.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> -John
>>>>>>>
>>>>>>> On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote:
>>>>>>>> Hi John,
>>>>>>>>
>>>>>>>> Just one question. It wasn't very clear to me exactly when the
>>>> metadata
>>>>>>>> would be returned in `ConsumerRecords`. Would we /always/ include the
>>>>>>>> metadata for all partitions that are assigned, or would it be based
>>>> on the
>>>>>>>> latest fetches?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Jason
>>>>>>>>
>>>>>>>> On Fri, Dec 11, 2020 at 4:07 PM John Roesler <vvcep...@apache.org>
>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Guozhang!
>>>>>>>>>
>>>>>>>>> All of your feedback sounds good to me. I’ll update the KIP when I
>>>> am able.
>>>>>>>>>
>>>>>>>>> 3) I believe it is the position after the fetch, but I will
>>>> confirm. I
>>>>>>>>> think omitting position may render beginning and end offsets
>>>> useless as
>>>>>>>>> well, which leaves only lag. That would be fine with me, but it
>>>> also seems
>>>>>>>>> nice to supply this extra metadata since it is well defined and
>>>> probably
>>>>>>>>> handy for others. Therefore, I’d go the route of specifying the
>>>> exact
>>>>>>>>> semantics and keeping it.
>>>>>>>>>
>>>>>>>>> Thanks for the review,
>>>>>>>>> John
>>>>>>>>>
>>>>>>>>> On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
>>>>>>>>>> Hello John,
>>>>>>>>>>
>>>>>>>>>> Thanks for the updates! I've made a pass on the KIP and also the
>>>> POC PR,
>>>>>>>>>> here are some minor comments:
>>>>>>>>>>
>>>>>>>>>> 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
>>>>>>>>> updated,
>>>>>>>>>> and we do not create a new object but just update the values
>>>> in-place, so
>>>>>>>>>> maybe calling it `lastUpdateTimstamp` is better?
>>>>>>>>>>
>>>>>>>>>> 2) It will be great to verify in javadocs that the new API
>>>>>>>>>> "ConsumerRecords#metadata(): Map<TopicPartition, Metadata>" may
>>>> return a
>>>>>>>>>> superset of TopicPartitions than the existing API that returns the
>>>> data
>>>>>>>>> by
>>>>>>>>>> partitions, in case users assume their map key-entries would
>>>> always be
>>>>>>>>> the
>>>>>>>>>> same.
>>>>>>>>>>
>>>>>>>>>> 3) The "position()" API of the call needs better clarification: is
>>>> it the
>>>>>>>>>> current position AFTER the records are returned, or is it BEFORE
>>>> the
>>>>>>>>>> records are returned? Personally I'd suggest we do not include it
>>>> if it
>>>>>>>>> is
>>>>>>>>>> not used anywhere yet just to avoid possible misuage, but I'm fine
>>>> if you
>>>>>>>>>> like to keep it still; in that case just clarify its semantics.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Other than that,I'm +1 on the KIP as well !
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson <
>>>> wcarl...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the KIP!
>>>>>>>>>>>
>>>>>>>>>>> +1 (non-binding)
>>>>>>>>>>>
>>>>>>>>>>> walker
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna <br...@confluent.io
>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the KIP, John!
>>>>>>>>>>>>
>>>>>>>>>>>> +1 (non-binding)
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Bruno
>>>>>>>>>>>>
>>>>>>>>>>>> On 08.12.20 18:03, John Roesler wrote:
>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> There hasn't been much discussion on KIP-695 so far, so I'd
>>>>>>>>>>>>> like to go ahead and call for a vote.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As a reminder, the purpose of KIP-695 to improve on the
>>>>>>>>>>>>> "task idling" feature we introduced in KIP-353. This KIP
>>>>>>>>>>>>> will allow Streams to offer deterministic time semantics in
>>>>>>>>>>>>> join-type topologies. For example, it makes sure that
>>>>>>>>>>>>> when you join two topics, that we collate the topics by
>>>>>>>>>>>>> timestamp. That was always the intent with task idling (KIP-
>>>>>>>>>>>>> 353), but it turns out the previous mechanism couldn't
>>>>>>>>>>>>> provide the desired semantics.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The details are here:
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/x/JSXZCQ
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>
>>
>>

Reply via email to