[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401763#comment-17401763
 ] 

Sagar Rao commented on KAFKA-13152:
---

Thanks [~mjsax]/[~guozhang]. I will start writing a KIP and send out an email 
soon.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401318#comment-17401318
 ] 

Guozhang Wang commented on KAFKA-13152:
---

I think it's a good idea to continue our discussion on the KIP. I'm not 
strongly suggest that we do one option over the other, and maybe I could be 
over-thinking trying to get to where the task -> thread mapping is no longer 
static, which would not happen yet.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401302#comment-17401302
 ] 

Matthias J. Sax commented on KAFKA-13152:
-

[~guozhang] – your proposal to only pause partitions with non-empty buffers 
SGTM; about fairness, I was not sure if we can/should rely on the consumer, but 
if you think it's the right way to go, I am ok with it. Just wanted to raise 
the question to ensure that we make a conscious decision.

[~sagarrao] – I think we should take the discussion into the KIP? It seem the 
scope is clear now, and we have two proposal: Dividing the given buffer size 
across thread (or maybe even task etc) or follow Guozhang's proposal. It seems 
your concern is similar to my concern about fairness. Guozhang pointed out the 
we should achieve fairness within a thread (due to consumer round robin 
fetching) but I guess your point is a good one, that it's unclear if we achieve 
fairness across threads? [~guozhang] WDYT about this?

In the end the question seems to be, if we can/should try to keep it simple vs. 
how complex we want to design the algorithm. Personally, I am afraid of 
pre-mature optimization and think keeping it simple might be the better way to 
get started.

It might be best if you start to work on a KIP, and explain pros/cons of both 
approaches, and put one into "rejected alternatives" sections and we can 
discuss on the mailing list?

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401034#comment-17401034
 ] 

Sagar Rao commented on KAFKA-13152:
---

[~mjsax], [~guozhang] I had a slightly different thought whereby we can 
provision it as a global setting and then distribute it, similar to how 
`cache.max.bytes.buffering` works atm. 

So, assuming we have T Threads and C is the max buffer size, each thread gets 
C/T bytes.

Once the individual StreamThreads have been assigned their respective shares of 
bytes, then we can look at the total bytes consumed across tasks for that 
thread and if it exceeds it's share, we can pause the partitions.

We can go 1 step further here and further assign each task it's share of 
bytes(by dividing equally) and finally bytes per partition(similar to the 
current per partition records config but considering bytes by further equal 
division) but that's just extra complexity so can be ignored.

I think, where assigning C/T bytes among threads might be beneficial as 
compared to the option suggested by Guozhang here: 

 *instead we just monitor the aggregated total bytes across all tasks within 
the instance, when it has been reached* 

is that, let's say there are more than 1 Stream Threads in an instance, and 
only one of them is exceeding the bounds individually, but because we are 
looking at the overall bytes count across all tasks, the other Threads would 
also end up paying the penalty and be paused. If the users provision the config 
properly, they should be able to pause only relevant tasks and not all. What do 
you think?

Regarding pausing the partitions, i think, it makes sense to pause only those 
partitions that have some data as you both had discussed for simplicity, Maybe, 
we can look at heuristics like if there's only one partition which is 
accounting for say X % of bytes or pick the one with most bytes and pause only 
those. That might make it more complicated, but lead towards pausing only 
relevant partitions which is what `buffered.records.per.partition` is able to 
achieve.

 

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-17 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400651#comment-17400651
 ] 

Guozhang Wang commented on KAFKA-13152:
---

Yeah that's a good point. I think we should not pause partitions that have no 
data yet. Maybe we can modify 1) above as to "pause all partitions that have 
some data already".

As for "fairness", I think this is either achieved or we've lost it at the 
consumer level, as we do round-robin fetching across all assigned partitions. 
And let's say if some partition A's message size is larger than partition B's 
message size, and assume their income record rate are the same, then partition 
B would have more records fetched than partition A on average.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400647#comment-17400647
 ] 

Matthias J. Sax commented on KAFKA-13152:
-

For (1), I think we need to take time-synchronization into account. If a task 
has multiple partitions, and some are empty, we might delay processing base on 
`task.max.idle.ms` config – however, we should always allow to fetch for the 
partitions of empty buffers and never pause them; otherwise, we would sacrifice 
ordered processing and I think a tradeoff between semantics and "buffer size" 
would not be a good one? We could even end up in a "temporal deadlock": no task 
is processable as it has at least one empty buffer, but all partitions are 
paused because we exceeded the max-buffer-space – the deadlock is temporal, 
because we would go into "forced processing" after `task.max.idle.ms` passed 
though – or we need to change the behavior and go into "forced processed" right 
away for this case without waiting for `max.task.idle.ms` (but it might be 
desirable to ignore `task.max.idle.ms`).

Another question I have is about "balanced fetching": atm, we use the same 
buffer space for each partition and pause a single partition if its buffer 
space is exceeded. If we follow (1), could it happen that some partitions 
buffer much more data than others, and could this become a "fairness" problem? 
In the end, I agree that not have the exact same buffer space across all 
partitions can be beneficial: a high volume topic might be better off using 
more buffer space than a low volume topic. However, I am wonder if would still 
need some bounds to avoid that we go from the current extreme to give the exact 
same buffer space per partitions, to the opposite extreme for which some 
partitions might "starve" as their buffer space becomes too small?

 

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-17 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400641#comment-17400641
 ] 

Guozhang Wang commented on KAFKA-13152:
---

[~sagarrao] [~mjsax] (also cc [~desai.p.rohan]) I had a slightly different idea 
(probably a more complex one, just to admit) when filing the ticket.

It is indeed a global config controlling the total number of bytes used for 
source partition record buffering, but it would not be distributed across all 
threads / tasks, instead we just monitor the aggregated total bytes across all 
tasks within the instance, when it has been reached, we can consider several 
options:

1) just pause all partitions; and then resume all partitions when it has 
dropped below the threshold. Not sure if it would result much "thrashing" on 
pausing / resuming, but since these actions are quite cheap anyways I'm not too 
worried about that.
2) pause some partitions, e.g. one heuristic is to pick the partition with most 
bytes; and then resume all paused partitions when it has dropped below the 
threshold.

Personally I'm leaning towards 1) for now for simplicity, and we can consider 
if this is sufficient after observing its behavior in production later.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399395#comment-17399395
 ] 

Matthias J. Sax commented on KAFKA-13152:
-

The current level is a per-partition config. The want a per-client config, ie, 
the provided value must be distributed over all threads/tasks/partitions. Also 
note, that because threads can be added/removed dynamically, and the tasks (and 
thus partition) assignment may change during a rebalance, that we need to 
dynamically adjust the current limit per partition on any such event.

We would pause the corresponding partition that exceeds it's "quota" (that is 
also what we already do right now), so nothing must be changed here.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-14 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399173#comment-17399173
 ] 

Sagar Rao commented on KAFKA-13152:
---

[~guozhang], I was looking at this. One question i have is, what do you mean by 
introducing a global config? Is is at a StreamTask level or above? 

Also, in case the buffered records exceeds the config, what actions should we 
take? Should we for example, pause all consumers subscribed the the current 
StreamTask if the config is at a StreamTask level?

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-09 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396412#comment-17396412
 ] 

Sagar Rao commented on KAFKA-13152:
---

hey [~guozhang], i would like to take this up.. Will go through and post some 
notes here.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)