Thanks for the KIP. Overall LGTM.

Just one thought, if we "rename" the config directly as mentioned in the
KIP, would that break existing applications?
Should we deprecate the old one first, and make the old/new names co-exist
for some period of time?

Public Interfaces

   - Adding a new config *input.buffer.max.bytes *applicable at a topology
   level. The importance of this config would be *Medium*.
   - Renaming *cache.max.bytes.buffering* to *statestore.cache.max.bytes*.



Thank you.
Luke

On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Currently the state store cache size default value is 10MB today, which
> arguably is rather small. So I'm thinking maybe for this config default to
> 512MB.
>
> Other than that, LGTM.
>
> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansoc...@gmail.com> wrote:
>
> > Thanks Guozhang and Sophie.
> >
> > Yeah a small default value would lower the throughput. I didn't quite
> > realise it earlier. It's slightly hard to predict this value so I would
> > guess around 1/2 GB to 1 GB? WDYT?
> >
> > Regarding the renaming of the config and the new metric, sure would
> include
> > it in the KIP.
> >
> > Lastly, importance would also. be added. I guess Medium should be ok.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > 1) I agree that we should just distribute the bytes evenly, at least
> for
> > > now. It's simpler to understand and
> > > we can always change it later, plus it makes sense to keep this aligned
> > > with how the cache works today
> > >
> > > 2) +1 to being conservative in the generous sense, it's just not
> > something
> > > we can predict with any degree
> > > of accuracy and even if we could, the appropriate value is going to
> > differ
> > > wildly across applications and use
> > > cases. We might want to just pick some multiple of the default cache
> > size,
> > > and maybe do some research on
> > > other relevant defaults or sizes (default JVM heap, size of available
> > > memory in common hosts eg EC2
> > > instances, etc). We don't need to worry as much about erring on the
> side
> > of
> > > too big, since other configs like
> > > the max.poll.records will help somewhat to keep it from exploding.
> > >
> > > 4) 100%, I always found the *cache.max.bytes.buffering* config name to
> be
> > > incredibly confusing. Deprecating this in
> > > favor of "*statestore.cache.max.bytes*" and aligning it to the new
> input
> > > buffer config sounds good to me to include here.
> > >
> > > 5) The KIP should list all relevant public-facing changes, including
> > > metadata like the config's "Importance". Personally
> > > I would recommend Medium, or even High if we're really worried about
> the
> > > default being wrong for a lot of users
> > >
> > > Thanks for the KIP, besides those few things that Guozhang brought up
> and
> > > the config importance, everything SGTM
> > >
> > > -Sophie
> > >
> > > On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > 1) I meant for your proposed solution. I.e. to distribute the
> > configured
> > > > bytes among threads evenly.
> > > >
> > > > 2) I was actually thinking about making the default a large enough
> > value
> > > so
> > > > that we would not introduce performance regression: thinking about a
> > use
> > > > case with many partitions and each record may be large, then
> > effectively
> > > we
> > > > would only start pausing when the total bytes buffered is pretty
> large.
> > > If
> > > > we set the default value to small, we would be "more aggressive" on
> > > pausing
> > > > which may impact throughput.
> > > >
> > > > 3) Yes exactly, this would naturally be at the "partition-group"
> class
> > > > since that represents the task's all input partitions.
> > > >
> > > > 4) This is just a bold thought, I'm interested to see other's
> thoughts.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks Guozhang.
> > > > >
> > > > > 1) Just for my confirmation, when you say we should proceed with
> the
> > > even
> > > > > distribution of bytes, are you referring to the Proposed Solution
> in
> > > the
> > > > > KIP or the option you had considered in the JIRA?
> > > > > 2) Default value for the config is something that I missed. I agree
> > we
> > > > > can't have really large values as it might be detrimental to the
> > > > > performance. Maybe, as a starting point, we assume that only 1
> Stream
> > > > Task
> > > > > is running so what could be the ideal value in such a scenario?
> > > Somewhere
> > > > > around 10MB similar to the caching config?
> > > > > 3) When you say,  *a task level metric indicating the current
> totally
> > > > > aggregated metrics, * you mean the bytes aggregated at a task
> level?
> > > > > 4) I am ok with the name change, but would like to know others'
> > > thoughts.
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > > On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Thanks Sagar for writing this PR.
> > > > > >
> > > > > > I think twice about the options that have been proposed in
> > > > > > https://issues.apache.org/jira/browse/KAFKA-13152, and feel that
> > at
> > > > the
> > > > > > moment it's simpler to just do the even distribution of the
> > > configured
> > > > > > total bytes. My rationale is that right now we have a static
> tasks
> > ->
> > > > > > threads mapping, and hence each partition would only be fetched
> by
> > a
> > > > > single
> > > > > > thread / consumer at a given time. If in the future we break that
> > > > static
> > > > > > mapping into dynamic mapping, then we would not be able to do
> this
> > > even
> > > > > > distribution. Instead we would have other threads polling from
> > > consumer
> > > > > > only, and those threads would be responsible for checking the
> > config
> > > > and
> > > > > > pause non-empty partitions if it goes beyond the threshold. But
> > since
> > > > at
> > > > > > that time we would not change the config but just how it would be
> > > > > > implemented behind the scenes we would not need another KIP to
> > change
> > > > it.
> > > > > >
> > > > > > Some more comments:
> > > > > >
> > > > > > 1. We need to discuss a bit about the default value of this new
> > > config.
> > > > > > Personally I think we need to be a bit conservative with large
> > values
> > > > so
> > > > > > that it would not have any perf regression compared with old
> > configs
> > > > > > especially with large topology and large number of partitions.
> > > > > > 2. I looked at the existing metrics, and do not have
> corresponding
> > > > > sensors.
> > > > > > How about also adding a task level metric indicating the current
> > > > totally
> > > > > > aggregated metrics. The reason I do not suggest this metric on
> the
> > > > > > per-thread level is that in the future we may break the static
> > > mapping
> > > > of
> > > > > > tasks -> threads.
> > > > > >
> > > > > > [optional] As an orthogonal thought, I'm thinking maybe we can
> > rename
> > > > the
> > > > > > other "*cache.max.bytes.buffering*" as
> "statestore.cache.max.bytes"
> > > > (via
> > > > > > deprecation of course), piggy-backed in this KIP? Would like to
> > hear
> > > > > > others' thoughts.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sun, Aug 22, 2021 at 9:29 AM Sagar <sagarmeansoc...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I would like to start a discussion on the following KIP:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to