Hi All,

Bumping this thread again.

Thanks!
Sagar.

On Sat, Sep 11, 2021 at 2:04 PM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi Mathias,
>
> I missed out on the metrics part.
>
> I have added the new metric in the proposed changes section along with the
> small re-wording that you talked about.
>
> Let me know if that makes sense.
>
> Thanks!
> Sagar.
>
> On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Thanks for the KIP.
>>
>> There was some discussion about adding a metric on the thread, but the
>> KIP does not contain anything about it. Did we drop this suggestion or
>> was the KIP not updated accordingly?
>>
>>
>> Nit:
>>
>> > This would be a global config applicable per processing topology
>>
>> Can we change this to `per Kafka Streams instance.`
>>
>> Atm, a Stream instance executes a single topology, so it does not make
>> any effective difference right now. However, it seems better (more
>> logical) to bind the config to the instance (not the topology the
>> instance executes).
>>
>>
>> -Matthias
>>
>> On 9/2/21 6:08 AM, Sagar wrote:
>> > Thanks Guozhang and Luke.
>> >
>> > I have updated the KIP with all the suggested changes.
>> >
>> > Do you think we could start voting for this?
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <show...@gmail.com> wrote:
>> >
>> >> Thanks for the KIP. Overall LGTM.
>> >>
>> >> Just one thought, if we "rename" the config directly as mentioned in
>> the
>> >> KIP, would that break existing applications?
>> >> Should we deprecate the old one first, and make the old/new names
>> co-exist
>> >> for some period of time?
>> >>
>> >> Public Interfaces
>> >>
>> >>    - Adding a new config *input.buffer.max.bytes *applicable at a
>> topology
>> >>    level. The importance of this config would be *Medium*.
>> >>    - Renaming *cache.max.bytes.buffering* to
>> *statestore.cache.max.bytes*.
>> >>
>> >>
>> >>
>> >> Thank you.
>> >> Luke
>> >>
>> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >>
>> >>> Currently the state store cache size default value is 10MB today,
>> which
>> >>> arguably is rather small. So I'm thinking maybe for this config
>> default
>> >> to
>> >>> 512MB.
>> >>>
>> >>> Other than that, LGTM.
>> >>>
>> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansoc...@gmail.com>
>> >> wrote:
>> >>>
>> >>>> Thanks Guozhang and Sophie.
>> >>>>
>> >>>> Yeah a small default value would lower the throughput. I didn't quite
>> >>>> realise it earlier. It's slightly hard to predict this value so I
>> would
>> >>>> guess around 1/2 GB to 1 GB? WDYT?
>> >>>>
>> >>>> Regarding the renaming of the config and the new metric, sure would
>> >>> include
>> >>>> it in the KIP.
>> >>>>
>> >>>> Lastly, importance would also. be added. I guess Medium should be ok.
>> >>>>
>> >>>> Thanks!
>> >>>> Sagar.
>> >>>>
>> >>>>
>> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
>> >>>> <sop...@confluent.io.invalid> wrote:
>> >>>>
>> >>>>> 1) I agree that we should just distribute the bytes evenly, at least
>> >>> for
>> >>>>> now. It's simpler to understand and
>> >>>>> we can always change it later, plus it makes sense to keep this
>> >> aligned
>> >>>>> with how the cache works today
>> >>>>>
>> >>>>> 2) +1 to being conservative in the generous sense, it's just not
>> >>>> something
>> >>>>> we can predict with any degree
>> >>>>> of accuracy and even if we could, the appropriate value is going to
>> >>>> differ
>> >>>>> wildly across applications and use
>> >>>>> cases. We might want to just pick some multiple of the default cache
>> >>>> size,
>> >>>>> and maybe do some research on
>> >>>>> other relevant defaults or sizes (default JVM heap, size of
>> available
>> >>>>> memory in common hosts eg EC2
>> >>>>> instances, etc). We don't need to worry as much about erring on the
>> >>> side
>> >>>> of
>> >>>>> too big, since other configs like
>> >>>>> the max.poll.records will help somewhat to keep it from exploding.
>> >>>>>
>> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config name
>> >> to
>> >>> be
>> >>>>> incredibly confusing. Deprecating this in
>> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new
>> >>> input
>> >>>>> buffer config sounds good to me to include here.
>> >>>>>
>> >>>>> 5) The KIP should list all relevant public-facing changes, including
>> >>>>> metadata like the config's "Importance". Personally
>> >>>>> I would recommend Medium, or even High if we're really worried about
>> >>> the
>> >>>>> default being wrong for a lot of users
>> >>>>>
>> >>>>> Thanks for the KIP, besides those few things that Guozhang brought
>> up
>> >>> and
>> >>>>> the config importance, everything SGTM
>> >>>>>
>> >>>>> -Sophie
>> >>>>>
>> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the
>> >>>> configured
>> >>>>>> bytes among threads evenly.
>> >>>>>>
>> >>>>>> 2) I was actually thinking about making the default a large enough
>> >>>> value
>> >>>>> so
>> >>>>>> that we would not introduce performance regression: thinking about
>> >> a
>> >>>> use
>> >>>>>> case with many partitions and each record may be large, then
>> >>>> effectively
>> >>>>> we
>> >>>>>> would only start pausing when the total bytes buffered is pretty
>> >>> large.
>> >>>>> If
>> >>>>>> we set the default value to small, we would be "more aggressive" on
>> >>>>> pausing
>> >>>>>> which may impact throughput.
>> >>>>>>
>> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group"
>> >>> class
>> >>>>>> since that represents the task's all input partitions.
>> >>>>>>
>> >>>>>> 4) This is just a bold thought, I'm interested to see other's
>> >>> thoughts.
>> >>>>>>
>> >>>>>>
>> >>>>>> Guozhang
>> >>>>>>
>> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com>
>> >>>> wrote:
>> >>>>>>
>> >>>>>>> Thanks Guozhang.
>> >>>>>>>
>> >>>>>>> 1) Just for my confirmation, when you say we should proceed with
>> >>> the
>> >>>>> even
>> >>>>>>> distribution of bytes, are you referring to the Proposed Solution
>> >>> in
>> >>>>> the
>> >>>>>>> KIP or the option you had considered in the JIRA?
>> >>>>>>> 2) Default value for the config is something that I missed. I
>> >> agree
>> >>>> we
>> >>>>>>> can't have really large values as it might be detrimental to the
>> >>>>>>> performance. Maybe, as a starting point, we assume that only 1
>> >>> Stream
>> >>>>>> Task
>> >>>>>>> is running so what could be the ideal value in such a scenario?
>> >>>>> Somewhere
>> >>>>>>> around 10MB similar to the caching config?
>> >>>>>>> 3) When you say,  *a task level metric indicating the current
>> >>> totally
>> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task
>> >>> level?
>> >>>>>>> 4) I am ok with the name change, but would like to know others'
>> >>>>> thoughts.
>> >>>>>>>
>> >>>>>>> Thanks!
>> >>>>>>> Sagar.
>> >>>>>>>
>> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
>> >> wangg...@gmail.com
>> >>>>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Thanks Sagar for writing this PR.
>> >>>>>>>>
>> >>>>>>>> I think twice about the options that have been proposed in
>> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel
>> >> that
>> >>>> at
>> >>>>>> the
>> >>>>>>>> moment it's simpler to just do the even distribution of the
>> >>>>> configured
>> >>>>>>>> total bytes. My rationale is that right now we have a static
>> >>> tasks
>> >>>> ->
>> >>>>>>>> threads mapping, and hence each partition would only be fetched
>> >>> by
>> >>>> a
>> >>>>>>> single
>> >>>>>>>> thread / consumer at a given time. If in the future we break
>> >> that
>> >>>>>> static
>> >>>>>>>> mapping into dynamic mapping, then we would not be able to do
>> >>> this
>> >>>>> even
>> >>>>>>>> distribution. Instead we would have other threads polling from
>> >>>>> consumer
>> >>>>>>>> only, and those threads would be responsible for checking the
>> >>>> config
>> >>>>>> and
>> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But
>> >>>> since
>> >>>>>> at
>> >>>>>>>> that time we would not change the config but just how it would
>> >> be
>> >>>>>>>> implemented behind the scenes we would not need another KIP to
>> >>>> change
>> >>>>>> it.
>> >>>>>>>>
>> >>>>>>>> Some more comments:
>> >>>>>>>>
>> >>>>>>>> 1. We need to discuss a bit about the default value of this new
>> >>>>> config.
>> >>>>>>>> Personally I think we need to be a bit conservative with large
>> >>>> values
>> >>>>>> so
>> >>>>>>>> that it would not have any perf regression compared with old
>> >>>> configs
>> >>>>>>>> especially with large topology and large number of partitions.
>> >>>>>>>> 2. I looked at the existing metrics, and do not have
>> >>> corresponding
>> >>>>>>> sensors.
>> >>>>>>>> How about also adding a task level metric indicating the
>> >> current
>> >>>>>> totally
>> >>>>>>>> aggregated metrics. The reason I do not suggest this metric on
>> >>> the
>> >>>>>>>> per-thread level is that in the future we may break the static
>> >>>>> mapping
>> >>>>>> of
>> >>>>>>>> tasks -> threads.
>> >>>>>>>>
>> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can
>> >>>> rename
>> >>>>>> the
>> >>>>>>>> other "*cache.max.bytes.buffering*" as
>> >>> "statestore.cache.max.bytes"
>> >>>>>> (via
>> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to
>> >>>> hear
>> >>>>>>>> others' thoughts.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar <
>> >> sagarmeansoc...@gmail.com
>> >>>>
>> >>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi All,
>> >>>>>>>>>
>> >>>>>>>>> I would like to start a discussion on the following KIP:
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
>> >>>>>>>>>
>> >>>>>>>>> Thanks!
>> >>>>>>>>> Sagar.
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> --
>> >>>>>>>> -- Guozhang
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> --
>> >>>>>> -- Guozhang
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >>> --
>> >>> -- Guozhang
>> >>>
>> >>
>> >
>>
>

Reply via email to