Oh you already did -- missed the VOTE thread somehow. Voted :) On Tue, Sep 7, 2021 at 6:27 PM Sophie Blee-Goldman <sop...@confluent.io> wrote:
> Yeah, feel free to kick off the vote > > On Thu, Sep 2, 2021 at 6:08 AM Sagar <sagarmeansoc...@gmail.com> wrote: > >> Thanks Guozhang and Luke. >> >> I have updated the KIP with all the suggested changes. >> >> Do you think we could start voting for this? >> >> Thanks! >> Sagar. >> >> On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <show...@gmail.com> wrote: >> >> > Thanks for the KIP. Overall LGTM. >> > >> > Just one thought, if we "rename" the config directly as mentioned in the >> > KIP, would that break existing applications? >> > Should we deprecate the old one first, and make the old/new names >> co-exist >> > for some period of time? >> > >> > Public Interfaces >> > >> > - Adding a new config *input.buffer.max.bytes *applicable at a >> topology >> > level. The importance of this config would be *Medium*. >> > - Renaming *cache.max.bytes.buffering* to >> *statestore.cache.max.bytes*. >> > >> > >> > >> > Thank you. >> > Luke >> > >> > On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > Currently the state store cache size default value is 10MB today, >> which >> > > arguably is rather small. So I'm thinking maybe for this config >> default >> > to >> > > 512MB. >> > > >> > > Other than that, LGTM. >> > > >> > > On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansoc...@gmail.com> >> > wrote: >> > > >> > > > Thanks Guozhang and Sophie. >> > > > >> > > > Yeah a small default value would lower the throughput. I didn't >> quite >> > > > realise it earlier. It's slightly hard to predict this value so I >> would >> > > > guess around 1/2 GB to 1 GB? WDYT? >> > > > >> > > > Regarding the renaming of the config and the new metric, sure would >> > > include >> > > > it in the KIP. >> > > > >> > > > Lastly, importance would also. be added. I guess Medium should be >> ok. >> > > > >> > > > Thanks! >> > > > Sagar. >> > > > >> > > > >> > > > On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman >> > > > <sop...@confluent.io.invalid> wrote: >> > > > >> > > > > 1) I agree that we should just distribute the bytes evenly, at >> least >> > > for >> > > > > now. It's simpler to understand and >> > > > > we can always change it later, plus it makes sense to keep this >> > aligned >> > > > > with how the cache works today >> > > > > >> > > > > 2) +1 to being conservative in the generous sense, it's just not >> > > > something >> > > > > we can predict with any degree >> > > > > of accuracy and even if we could, the appropriate value is going >> to >> > > > differ >> > > > > wildly across applications and use >> > > > > cases. We might want to just pick some multiple of the default >> cache >> > > > size, >> > > > > and maybe do some research on >> > > > > other relevant defaults or sizes (default JVM heap, size of >> available >> > > > > memory in common hosts eg EC2 >> > > > > instances, etc). We don't need to worry as much about erring on >> the >> > > side >> > > > of >> > > > > too big, since other configs like >> > > > > the max.poll.records will help somewhat to keep it from exploding. >> > > > > >> > > > > 4) 100%, I always found the *cache.max.bytes.buffering* config >> name >> > to >> > > be >> > > > > incredibly confusing. Deprecating this in >> > > > > favor of "*statestore.cache.max.bytes*" and aligning it to the new >> > > input >> > > > > buffer config sounds good to me to include here. >> > > > > >> > > > > 5) The KIP should list all relevant public-facing changes, >> including >> > > > > metadata like the config's "Importance". Personally >> > > > > I would recommend Medium, or even High if we're really worried >> about >> > > the >> > > > > default being wrong for a lot of users >> > > > > >> > > > > Thanks for the KIP, besides those few things that Guozhang >> brought up >> > > and >> > > > > the config importance, everything SGTM >> > > > > >> > > > > -Sophie >> > > > > >> > > > > On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com >> > >> > > > wrote: >> > > > > >> > > > > > 1) I meant for your proposed solution. I.e. to distribute the >> > > > configured >> > > > > > bytes among threads evenly. >> > > > > > >> > > > > > 2) I was actually thinking about making the default a large >> enough >> > > > value >> > > > > so >> > > > > > that we would not introduce performance regression: thinking >> about >> > a >> > > > use >> > > > > > case with many partitions and each record may be large, then >> > > > effectively >> > > > > we >> > > > > > would only start pausing when the total bytes buffered is pretty >> > > large. >> > > > > If >> > > > > > we set the default value to small, we would be "more >> aggressive" on >> > > > > pausing >> > > > > > which may impact throughput. >> > > > > > >> > > > > > 3) Yes exactly, this would naturally be at the "partition-group" >> > > class >> > > > > > since that represents the task's all input partitions. >> > > > > > >> > > > > > 4) This is just a bold thought, I'm interested to see other's >> > > thoughts. >> > > > > > >> > > > > > >> > > > > > Guozhang >> > > > > > >> > > > > > On Mon, Aug 23, 2021 at 4:10 AM Sagar < >> sagarmeansoc...@gmail.com> >> > > > wrote: >> > > > > > >> > > > > > > Thanks Guozhang. >> > > > > > > >> > > > > > > 1) Just for my confirmation, when you say we should proceed >> with >> > > the >> > > > > even >> > > > > > > distribution of bytes, are you referring to the Proposed >> Solution >> > > in >> > > > > the >> > > > > > > KIP or the option you had considered in the JIRA? >> > > > > > > 2) Default value for the config is something that I missed. I >> > agree >> > > > we >> > > > > > > can't have really large values as it might be detrimental to >> the >> > > > > > > performance. Maybe, as a starting point, we assume that only 1 >> > > Stream >> > > > > > Task >> > > > > > > is running so what could be the ideal value in such a >> scenario? >> > > > > Somewhere >> > > > > > > around 10MB similar to the caching config? >> > > > > > > 3) When you say, *a task level metric indicating the current >> > > totally >> > > > > > > aggregated metrics, * you mean the bytes aggregated at a task >> > > level? >> > > > > > > 4) I am ok with the name change, but would like to know >> others' >> > > > > thoughts. >> > > > > > > >> > > > > > > Thanks! >> > > > > > > Sagar. >> > > > > > > >> > > > > > > On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang < >> > wangg...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > Thanks Sagar for writing this PR. >> > > > > > > > >> > > > > > > > I think twice about the options that have been proposed in >> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-13152, and feel >> > that >> > > > at >> > > > > > the >> > > > > > > > moment it's simpler to just do the even distribution of the >> > > > > configured >> > > > > > > > total bytes. My rationale is that right now we have a static >> > > tasks >> > > > -> >> > > > > > > > threads mapping, and hence each partition would only be >> fetched >> > > by >> > > > a >> > > > > > > single >> > > > > > > > thread / consumer at a given time. If in the future we break >> > that >> > > > > > static >> > > > > > > > mapping into dynamic mapping, then we would not be able to >> do >> > > this >> > > > > even >> > > > > > > > distribution. Instead we would have other threads polling >> from >> > > > > consumer >> > > > > > > > only, and those threads would be responsible for checking >> the >> > > > config >> > > > > > and >> > > > > > > > pause non-empty partitions if it goes beyond the threshold. >> But >> > > > since >> > > > > > at >> > > > > > > > that time we would not change the config but just how it >> would >> > be >> > > > > > > > implemented behind the scenes we would not need another KIP >> to >> > > > change >> > > > > > it. >> > > > > > > > >> > > > > > > > Some more comments: >> > > > > > > > >> > > > > > > > 1. We need to discuss a bit about the default value of this >> new >> > > > > config. >> > > > > > > > Personally I think we need to be a bit conservative with >> large >> > > > values >> > > > > > so >> > > > > > > > that it would not have any perf regression compared with old >> > > > configs >> > > > > > > > especially with large topology and large number of >> partitions. >> > > > > > > > 2. I looked at the existing metrics, and do not have >> > > corresponding >> > > > > > > sensors. >> > > > > > > > How about also adding a task level metric indicating the >> > current >> > > > > > totally >> > > > > > > > aggregated metrics. The reason I do not suggest this metric >> on >> > > the >> > > > > > > > per-thread level is that in the future we may break the >> static >> > > > > mapping >> > > > > > of >> > > > > > > > tasks -> threads. >> > > > > > > > >> > > > > > > > [optional] As an orthogonal thought, I'm thinking maybe we >> can >> > > > rename >> > > > > > the >> > > > > > > > other "*cache.max.bytes.buffering*" as >> > > "statestore.cache.max.bytes" >> > > > > > (via >> > > > > > > > deprecation of course), piggy-backed in this KIP? Would >> like to >> > > > hear >> > > > > > > > others' thoughts. >> > > > > > > > >> > > > > > > > >> > > > > > > > Guozhang >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Sun, Aug 22, 2021 at 9:29 AM Sagar < >> > sagarmeansoc...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hi All, >> > > > > > > > > >> > > > > > > > > I would like to start a discussion on the following KIP: >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 >> > > > > > > > > >> > > > > > > > > Thanks! >> > > > > > > > > Sagar. >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > -- >> > > > > > > > -- Guozhang >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > -- Guozhang >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> >