Thanks Guozhang.

1) Just for my confirmation, when you say we should proceed with the even
distribution of bytes, are you referring to the Proposed Solution in the
KIP or the option you had considered in the JIRA?
2) Default value for the config is something that I missed. I agree we
can't have really large values as it might be detrimental to the
performance. Maybe, as a starting point, we assume that only 1 Stream Task
is running so what could be the ideal value in such a scenario? Somewhere
around 10MB similar to the caching config?
3) When you say,  *a task level metric indicating the current totally
aggregated metrics, * you mean the bytes aggregated at a task level?
4) I am ok with the name change, but would like to know others' thoughts.


On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <> wrote:

> Thanks Sagar for writing this PR.
> I think twice about the options that have been proposed in
>, and feel that at the
> moment it's simpler to just do the even distribution of the configured
> total bytes. My rationale is that right now we have a static tasks ->
> threads mapping, and hence each partition would only be fetched by a single
> thread / consumer at a given time. If in the future we break that static
> mapping into dynamic mapping, then we would not be able to do this even
> distribution. Instead we would have other threads polling from consumer
> only, and those threads would be responsible for checking the config and
> pause non-empty partitions if it goes beyond the threshold. But since at
> that time we would not change the config but just how it would be
> implemented behind the scenes we would not need another KIP to change it.
> Some more comments:
> 1. We need to discuss a bit about the default value of this new config.
> Personally I think we need to be a bit conservative with large values so
> that it would not have any perf regression compared with old configs
> especially with large topology and large number of partitions.
> 2. I looked at the existing metrics, and do not have corresponding sensors.
> How about also adding a task level metric indicating the current totally
> aggregated metrics. The reason I do not suggest this metric on the
> per-thread level is that in the future we may break the static mapping of
> tasks -> threads.
> [optional] As an orthogonal thought, I'm thinking maybe we can rename the
> other "*cache.max.bytes.buffering*" as "statestore.cache.max.bytes" (via
> deprecation of course), piggy-backed in this KIP? Would like to hear
> others' thoughts.
> Guozhang
> On Sun, Aug 22, 2021 at 9:29 AM Sagar <> wrote:
> > Hi All,
> >
> > I would like to start a discussion on the following KIP:
> >
> >
> > Thanks!
> > Sagar.
> >
> --
> -- Guozhang

Reply via email to