1) I meant for your proposed solution. I.e. to distribute the configured
bytes among threads evenly.

2) I was actually thinking about making the default a large enough value so
that we would not introduce performance regression: thinking about a use
case with many partitions and each record may be large, then effectively we
would only start pausing when the total bytes buffered is pretty large. If
we set the default value to small, we would be "more aggressive" on pausing
which may impact throughput.

3) Yes exactly, this would naturally be at the "partition-group" class
since that represents the task's all input partitions.

4) This is just a bold thought, I'm interested to see other's thoughts.


Guozhang

On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansoc...@gmail.com> wrote:

> Thanks Guozhang.
>
> 1) Just for my confirmation, when you say we should proceed with the even
> distribution of bytes, are you referring to the Proposed Solution in the
> KIP or the option you had considered in the JIRA?
> 2) Default value for the config is something that I missed. I agree we
> can't have really large values as it might be detrimental to the
> performance. Maybe, as a starting point, we assume that only 1 Stream Task
> is running so what could be the ideal value in such a scenario? Somewhere
> around 10MB similar to the caching config?
> 3) When you say,  *a task level metric indicating the current totally
> aggregated metrics, * you mean the bytes aggregated at a task level?
> 4) I am ok with the name change, but would like to know others' thoughts.
>
> Thanks!
> Sagar.
>
> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Thanks Sagar for writing this PR.
> >
> > I think twice about the options that have been proposed in
> > https://issues.apache.org/jira/browse/KAFKA-13152, and feel that at the
> > moment it's simpler to just do the even distribution of the configured
> > total bytes. My rationale is that right now we have a static tasks ->
> > threads mapping, and hence each partition would only be fetched by a
> single
> > thread / consumer at a given time. If in the future we break that static
> > mapping into dynamic mapping, then we would not be able to do this even
> > distribution. Instead we would have other threads polling from consumer
> > only, and those threads would be responsible for checking the config and
> > pause non-empty partitions if it goes beyond the threshold. But since at
> > that time we would not change the config but just how it would be
> > implemented behind the scenes we would not need another KIP to change it.
> >
> > Some more comments:
> >
> > 1. We need to discuss a bit about the default value of this new config.
> > Personally I think we need to be a bit conservative with large values so
> > that it would not have any perf regression compared with old configs
> > especially with large topology and large number of partitions.
> > 2. I looked at the existing metrics, and do not have corresponding
> sensors.
> > How about also adding a task level metric indicating the current totally
> > aggregated metrics. The reason I do not suggest this metric on the
> > per-thread level is that in the future we may break the static mapping of
> > tasks -> threads.
> >
> > [optional] As an orthogonal thought, I'm thinking maybe we can rename the
> > other "*cache.max.bytes.buffering*" as "statestore.cache.max.bytes" (via
> > deprecation of course), piggy-backed in this KIP? Would like to hear
> > others' thoughts.
> >
> >
> > Guozhang
> >
> >
> >
> > On Sun, Aug 22, 2021 at 9:29 AM Sagar <sagarmeansoc...@gmail.com> wrote:
> >
> > > Hi All,
> > >
> > > I would like to start a discussion on the following KIP:
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> > >
> > > Thanks!
> > > Sagar.
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to