[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401763#comment-17401763 ] Sagar Rao commented on KAFKA-13152: --- Thanks [~mjsax]/[~guozhang]. I will start writing a KIP and send out an email soon. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401318#comment-17401318 ] Guozhang Wang commented on KAFKA-13152: --- I think it's a good idea to continue our discussion on the KIP. I'm not strongly suggest that we do one option over the other, and maybe I could be over-thinking trying to get to where the task -> thread mapping is no longer static, which would not happen yet. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401302#comment-17401302 ] Matthias J. Sax commented on KAFKA-13152: - [~guozhang] – your proposal to only pause partitions with non-empty buffers SGTM; about fairness, I was not sure if we can/should rely on the consumer, but if you think it's the right way to go, I am ok with it. Just wanted to raise the question to ensure that we make a conscious decision. [~sagarrao] – I think we should take the discussion into the KIP? It seem the scope is clear now, and we have two proposal: Dividing the given buffer size across thread (or maybe even task etc) or follow Guozhang's proposal. It seems your concern is similar to my concern about fairness. Guozhang pointed out the we should achieve fairness within a thread (due to consumer round robin fetching) but I guess your point is a good one, that it's unclear if we achieve fairness across threads? [~guozhang] WDYT about this? In the end the question seems to be, if we can/should try to keep it simple vs. how complex we want to design the algorithm. Personally, I am afraid of pre-mature optimization and think keeping it simple might be the better way to get started. It might be best if you start to work on a KIP, and explain pros/cons of both approaches, and put one into "rejected alternatives" sections and we can discuss on the mailing list? > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401034#comment-17401034 ] Sagar Rao commented on KAFKA-13152: --- [~mjsax], [~guozhang] I had a slightly different thought whereby we can provision it as a global setting and then distribute it, similar to how `cache.max.bytes.buffering` works atm. So, assuming we have T Threads and C is the max buffer size, each thread gets C/T bytes. Once the individual StreamThreads have been assigned their respective shares of bytes, then we can look at the total bytes consumed across tasks for that thread and if it exceeds it's share, we can pause the partitions. We can go 1 step further here and further assign each task it's share of bytes(by dividing equally) and finally bytes per partition(similar to the current per partition records config but considering bytes by further equal division) but that's just extra complexity so can be ignored. I think, where assigning C/T bytes among threads might be beneficial as compared to the option suggested by Guozhang here: *instead we just monitor the aggregated total bytes across all tasks within the instance, when it has been reached* is that, let's say there are more than 1 Stream Threads in an instance, and only one of them is exceeding the bounds individually, but because we are looking at the overall bytes count across all tasks, the other Threads would also end up paying the penalty and be paused. If the users provision the config properly, they should be able to pause only relevant tasks and not all. What do you think? Regarding pausing the partitions, i think, it makes sense to pause only those partitions that have some data as you both had discussed for simplicity, Maybe, we can look at heuristics like if there's only one partition which is accounting for say X % of bytes or pick the one with most bytes and pause only those. That might make it more complicated, but lead towards pausing only relevant partitions which is what `buffered.records.per.partition` is able to achieve. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400651#comment-17400651 ] Guozhang Wang commented on KAFKA-13152: --- Yeah that's a good point. I think we should not pause partitions that have no data yet. Maybe we can modify 1) above as to "pause all partitions that have some data already". As for "fairness", I think this is either achieved or we've lost it at the consumer level, as we do round-robin fetching across all assigned partitions. And let's say if some partition A's message size is larger than partition B's message size, and assume their income record rate are the same, then partition B would have more records fetched than partition A on average. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400647#comment-17400647 ] Matthias J. Sax commented on KAFKA-13152: - For (1), I think we need to take time-synchronization into account. If a task has multiple partitions, and some are empty, we might delay processing base on `task.max.idle.ms` config – however, we should always allow to fetch for the partitions of empty buffers and never pause them; otherwise, we would sacrifice ordered processing and I think a tradeoff between semantics and "buffer size" would not be a good one? We could even end up in a "temporal deadlock": no task is processable as it has at least one empty buffer, but all partitions are paused because we exceeded the max-buffer-space – the deadlock is temporal, because we would go into "forced processing" after `task.max.idle.ms` passed though – or we need to change the behavior and go into "forced processed" right away for this case without waiting for `max.task.idle.ms` (but it might be desirable to ignore `task.max.idle.ms`). Another question I have is about "balanced fetching": atm, we use the same buffer space for each partition and pause a single partition if its buffer space is exceeded. If we follow (1), could it happen that some partitions buffer much more data than others, and could this become a "fairness" problem? In the end, I agree that not have the exact same buffer space across all partitions can be beneficial: a high volume topic might be better off using more buffer space than a low volume topic. However, I am wonder if would still need some bounds to avoid that we go from the current extreme to give the exact same buffer space per partitions, to the opposite extreme for which some partitions might "starve" as their buffer space becomes too small? > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400641#comment-17400641 ] Guozhang Wang commented on KAFKA-13152: --- [~sagarrao] [~mjsax] (also cc [~desai.p.rohan]) I had a slightly different idea (probably a more complex one, just to admit) when filing the ticket. It is indeed a global config controlling the total number of bytes used for source partition record buffering, but it would not be distributed across all threads / tasks, instead we just monitor the aggregated total bytes across all tasks within the instance, when it has been reached, we can consider several options: 1) just pause all partitions; and then resume all partitions when it has dropped below the threshold. Not sure if it would result much "thrashing" on pausing / resuming, but since these actions are quite cheap anyways I'm not too worried about that. 2) pause some partitions, e.g. one heuristic is to pick the partition with most bytes; and then resume all paused partitions when it has dropped below the threshold. Personally I'm leaning towards 1) for now for simplicity, and we can consider if this is sufficient after observing its behavior in production later. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399395#comment-17399395 ] Matthias J. Sax commented on KAFKA-13152: - The current level is a per-partition config. The want a per-client config, ie, the provided value must be distributed over all threads/tasks/partitions. Also note, that because threads can be added/removed dynamically, and the tasks (and thus partition) assignment may change during a rebalance, that we need to dynamically adjust the current limit per partition on any such event. We would pause the corresponding partition that exceeds it's "quota" (that is also what we already do right now), so nothing must be changed here. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399173#comment-17399173 ] Sagar Rao commented on KAFKA-13152: --- [~guozhang], I was looking at this. One question i have is, what do you mean by introducing a global config? Is is at a StreamTask level or above? Also, in case the buffered records exceeds the config, what actions should we take? Should we for example, pause all consumers subscribed the the current StreamTask if the config is at a StreamTask level? > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396412#comment-17396412 ] Sagar Rao commented on KAFKA-13152: --- hey [~guozhang], i would like to take this up.. Will go through and post some notes here. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)