[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231123#comment-17231123 ] Richard Yu commented on KAFKA-10575: Thanks for letting me know! I was already looking at StoreChangelogReader since that was probably one of only two places where onRestoreEnd was called. Hopefully, I will be able to pull together a PR that can tackle this issue. > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228890#comment-17228890 ] Richard Yu commented on KAFKA-10575: [~guozhang] I'm interested in picking this one up. May I try my hand at it? > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136786#comment-17136786 ] Richard Yu edited comment on KAFKA-8770 at 6/16/20, 4:25 PM: - [~gyammine] Thanks for the comment! I apologize for the hiatus when tackling this issue. I am planning on getting more of this KIP implemented. [~vvcephei] I will ping you once the PR is in a good state. :) was (Author: yohan123): [~gyammine] Thanks for the comment! I apologize for the hiatus when tackling this issue. I am planning on tackling this next. Hope we can get it into the next release. [~vvcephei] Will ping you once the PR for FK-join is ready for review. :) > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136786#comment-17136786 ] Richard Yu commented on KAFKA-8770: --- [~gyammine] Thanks for the comment! I apologize for the hiatus when tackling this issue. I am planning on tackling this next. Hope we can get it into the next release. [~vvcephei] Will ping you once the PR for FK-join is ready for review. :) > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9808) Refactor State Store Hierarchy
[ https://issues.apache.org/jira/browse/KAFKA-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074752#comment-17074752 ] Richard Yu commented on KAFKA-9808: --- My assumption is that to make this refactoring more doable, we probably can break the issue down, and tackle the basic store types, one at a time: # KeyValueStore # SessionStore # WindowStore # SegmentedBytesStore Not all of them I suppose would require extensive cleanup, but its good to keep in mind,. > Refactor State Store Hierarchy > -- > > Key: KAFKA-9808 > URL: https://issues.apache.org/jira/browse/KAFKA-9808 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Richard Yu >Priority: Major > Attachments: Current State Store Hierarchy.pdf > > > Over years of development, Kafka contributors has been adding more and more > state store classes on top of each other without too much regard to making it > more approachable for future modifications. For instance, it has become > increasingly difficult to add new API to state store classes while at the > same time, preventing them from being exposed to users. > In sum, the entire hierarchy is slowly spiraling out of control, and there is > a growing need to consolidate the multiple state store types into a few more > manageable ones for future Kafka developers. > Note: There has already been a couple of attempts to simplify the state store > hierarchy, but while the task isn't too complex, its just the enormous scope > of the change which makes things difficult. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9808) Refactor State Store Hierarchy
[ https://issues.apache.org/jira/browse/KAFKA-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9808: -- Attachment: Current State Store Hierarchy.pdf > Refactor State Store Hierarchy > -- > > Key: KAFKA-9808 > URL: https://issues.apache.org/jira/browse/KAFKA-9808 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Richard Yu >Priority: Major > Attachments: Current State Store Hierarchy.pdf > > > Over years of development, Kafka contributors has been adding more and more > state store classes on top of each other without too much regard to making it > more approachable for future modifications. For instance, it has become > increasingly difficult to add new API to state store classes while at the > same time, preventing them from being exposed to users. > In sum, the entire hierarchy is slowly spiraling out of control, and there is > a growing need to consolidate the multiple state store types into a few more > manageable ones for future Kafka developers. > Note: There has already been a couple of attempts to simplify the state store > hierarchy, but while the task isn't too complex, its just the enormous scope > of the change which makes things difficult. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9808) Refactor State Store Hierarchy
Richard Yu created KAFKA-9808: - Summary: Refactor State Store Hierarchy Key: KAFKA-9808 URL: https://issues.apache.org/jira/browse/KAFKA-9808 Project: Kafka Issue Type: Improvement Components: streams Reporter: Richard Yu Over years of development, Kafka contributors has been adding more and more state store classes on top of each other without too much regard to making it more approachable for future modifications. For instance, it has become increasingly difficult to add new API to state store classes while at the same time, preventing them from being exposed to users. In sum, the entire hierarchy is slowly spiraling out of control, and there is a growing need to consolidate the multiple state store types into a few more manageable ones for future Kafka developers. Note: There has already been a couple of attempts to simplify the state store hierarchy, but while the task isn't too complex, its just the enormous scope of the change which makes things difficult. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9347) Detect deleted log directory before becoming leader
[ https://issues.apache.org/jira/browse/KAFKA-9347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17070549#comment-17070549 ] Richard Yu commented on KAFKA-9347: --- [~hachikuji] Can I pick this one up? > Detect deleted log directory before becoming leader > --- > > Key: KAFKA-9347 > URL: https://issues.apache.org/jira/browse/KAFKA-9347 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > Labels: needs-discussion > > There is no protection currently if a broker has had its log directory > deleted to prevent it from becoming the leader of a partition that it still > remains in the ISR of. This scenario can happen when the last remaining > replica in the ISR is shutdown. It will remain in the ISR and be eligible for > leadership as soon as it starts up. It would be useful to either detect this > case situation dynamically in order to force the user to do an unclean > election or recover another broker. One option might be just to pass a flag > on startup to specify that a broker should not be eligible for leadership. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9733) Consider addition of leader quorum in replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064328#comment-17064328 ] Richard Yu commented on KAFKA-9733: --- [~bchen225242] Do you know how much use this has for Kafka? The design and implementation for this issue would no doubt be horrendously complex, so whats your take? > Consider addition of leader quorum in replication model > --- > > Key: KAFKA-9733 > URL: https://issues.apache.org/jira/browse/KAFKA-9733 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Richard Yu >Priority: Minor > > Kafka's current replication model (with its single leader and several > followers) is somewhat similar to the current consensus algorithms being used > in databases (RAFT) with the major difference being the existence of the ISR. > Consequently, Kafka suffers from the same fault tolerance issues as does > other distributed systems which rely on RAFT: the leader tends to be the > chokepoint for failures i.e. if it goes down, it will have a brief > stop-the-world effect. > In contrast, giving all replicas the power to write and read to other > replicas is also difficult to accomplish (as emphasized by the complexity of > the Egalitarian Paxos algorithm), since consistency is so hard to maintain in > such an algorithm, plus very little gain compared to the overhead. > Therefore, I propose that we have an intermediate plan in between these two > algorithms, and that is the leader replica quorum. In essence, there will be > multiple leaders (which have the power for both read and writes), but the > number of leaders will not be excessive (i.e. maybe three at max). How we > achieve consistency is simple: > * Any leader has the power to propose a write update to other replicas. But > before passing a write update to a follower, the other leaders must elect if > such an operation is granted. > * In principle, a leader will propose a write update to the other leaders, > and once the other leaders have integrated that write update into their > version of the stored data, they will also give the green light. > * If say, more than half the other leaders have agreed that the current > change is good to go, then we can forward the change downstream to the other > replicas. > The algorithm for maintaining consistency between multiple leaders will > still have to be worked out in detail. However, there would be multiple gains > from this design over the old model: > # The single leader failure bottleneck has been alleviated to a certain > extent, since there are now multiple leader replicas. > # Write updates will potentially no longer be bottlenecked at one single > leader (since there are multiple leaders available). On a related note, there > has been a KIP that allows clients to read from non-leader replicas. (will > add the KIP link soon). > Some might note that the overhead from maintaining consistency among multiple > leaders might offset these gains. That might be true, with a large number of > leaders, but with a small number then (capped at 3 as mentioned above), the > overhead will also be correspondingly small. (How latency will be affected is > unknown until further testing, but more than likely, this option will > probably be. configurable depending on user requirements). > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will potentially no longer be bottlenecked at one single leader (since there are multiple leaders available). On a related note, there has been a KIP that allows clients to read from non-leader replicas. (will add the KIP link soon). Some might note that the overhead from maintaining consistency among multiple leaders might offset these gains. That might be true, with a large number of leaders, but with a small number then (capped at 3 as mentioned above), the overhead will also be correspondingly small. (How latency will be affected is unknown until further testing, but more than likely, this option will probably be. configurable depending on user requirements). was: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or five at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will potentially no longer be bottlenecked at one single leader (since there are mul
[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or five at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will no longer be bottlenecked at one single leader (since there are multiple leaders available). On a related note, there has been a KIP that allows clients to read from non-leader replicas. (will add the KIP link soon). Some might note that the overhead from maintaining consistency among multiple leaders might offset these gains. That might be true, with a large number of leaders, but with a small number then (capped at 3 or 4 as mentioned above), the overhead will also be correspondingly small. (How latency will be affected is unknown until further testing, but more than likely, this option will probably be. configurable depending on user requirements). was: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or four at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will no longer be bottlenecked at one single leader (since there are multiple leade
[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or five at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will potentially no longer be bottlenecked at one single leader (since there are multiple leaders available). On a related note, there has been a KIP that allows clients to read from non-leader replicas. (will add the KIP link soon). Some might note that the overhead from maintaining consistency among multiple leaders might offset these gains. That might be true, with a large number of leaders, but with a small number then (capped at 3 or 5 as mentioned above), the overhead will also be correspondingly small. (How latency will be affected is unknown until further testing, but more than likely, this option will probably be. configurable depending on user requirements). was: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or five at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will no longer be bottlenecked at one single leader (since there are mu
[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or four at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will no longer be bottlenecked at one single leader (since there are multiple leaders available). On a related note, there has been a KIP that allows clients to read from non-leader replicas. (will add the KIP link soon). Some might note that the overhead from maintaining consistency among multiple leaders might offset these gains. That might be true, with a large number of leaders, but with a small number then (capped at 3 or 4 as mentioned above), the overhead will also be correspondingly small. (How latency will be affected is unknown until further testing, but more than likely, this option will probably be. configurable depending on user requirements). was: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or four at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will no longer be bottlenecked at one single leader (since there are multiple leade
[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate plan in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or four at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can forward the change downstream to the other replicas. The algorithm for maintaining consistency between multiple leaders will still have to be worked out in detail. However, there would be multiple gains from this design over the old model: # The single leader failure bottleneck has been alleviated to a certain extent, since there are now multiple leader replicas. # Write updates will no longer be bottlenecked at one single leader (since there are multiple leaders available). On a related note, there has been a KIP that allows clients to read from non-leader replicas. (will add the KIP link soon). Some might note that the overhead from maintaining consistency among multiple leaders might offset these gains. That might be true, with a large number of leaders, but with a small number them (capped at 3 or 4 as mentioned above), the overhead will also be correspondingly small. (How latency will be affected is unknown until further testing, but more than likely, this option will probably be. configurable depending on user requirements). was: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate step in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or four at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can > Consider addition of leader quorum in replication model > --- > > Key: KAFKA-9733 > URL: https://issues.apache.org/jira/browse/KAFKA-9733 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Richard Yu >Priority: Minor > > Kafka's current replication model (with its single leader and several
[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate step in between these two algorithms, and that is the leader replica quorum. In essence, there will be multiple leaders (which have the power for both read and writes), but the number of leaders will not be excessive (i.e. maybe three or four at max). How we achieve consistency is simple: * Any leader has the power to propose a write update to other replicas. But before passing a write update to a follower, the other leaders must elect if such an operation is granted. * In principle, a leader will propose a write update to the other leaders, and once the other leaders have integrated that write update into their version of the stored data, they will also give the green light. * If say, more than half the other leaders have agreed that the current change is good to go, then we can was: Note: Description still not finished. Still not sure if this is needed. Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate step in between these two algorithms, and that is the leader partition quorum. > Consider addition of leader quorum in replication model > --- > > Key: KAFKA-9733 > URL: https://issues.apache.org/jira/browse/KAFKA-9733 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Richard Yu >Priority: Minor > > Kafka's current replication model (with its single leader and several > followers) is somewhat similar to the current consensus algorithms being used > in databases (RAFT) with the major difference being the existence of the ISR. > Consequently, Kafka suffers from the same fault tolerance issues as does > other distributed systems which rely on RAFT: the leader tends to be the > chokepoint for failures i.e. if it goes down, it will have a brief > stop-the-world effect. > In contrast, giving all replicas the power to write and read to other > replicas is also difficult to accomplish (as emphasized by the complexity of > the Egalitarian Paxos algorithm), since consistency is so hard to maintain in > such an algorithm, plus very little gain compared to the overhead. > Therefore, I propose that we have an intermediate step in between these two > algorithms, and that is the leader replica quorum. In essence, there will be > multiple leaders (which have the power for both read and writes), but the > number of leaders will not be excessive (i.e. maybe three or four at max). > How we achieve consistency is simple: > * Any leader has the power to propose a write update to other replicas. But > before passing a write update to a follower, the other leaders must elect if > such an operation is granted. > * In principle, a leader will propose a write update to the other leaders, > and once the other leaders have integrated that write update into their > version of the stored data, they will also give the green light. > * If say, more than half the other leaders have agreed that the current > change is good to go, then we can > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Summary: Consider addition of leader quorum in replication model (was: Consider addition of leader partition quorum) > Consider addition of leader quorum in replication model > --- > > Key: KAFKA-9733 > URL: https://issues.apache.org/jira/browse/KAFKA-9733 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Richard Yu >Priority: Minor > > Note: Description still not finished. Still not sure if this is needed. > Kafka's current replication model (with its single leader and several > followers) is somewhat similar to the current consensus algorithms being used > in databases (RAFT) with the major difference being the existence of the ISR. > Consequently, Kafka suffers from the same fault tolerance issues as does > other distributed systems which rely on RAFT: the leader tends to be the > chokepoint for failures i.e. if it goes down, it will have a brief > stop-the-world effect. > In contrast, giving all replicas the power to write and read to other > replicas is also difficult to accomplish (as emphasized by the complexity of > the Egalitarian Paxos algorithm), since consistency is so hard to maintain in > such an algorithm, plus very little gain compared to the overhead. > Therefore, I propose that we have an intermediate step in between these two > algorithms, and that is the leader partition quorum. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9733) Consider addition of leader partition quorum
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Note: Description still not finished. Still not sure if this is needed. Kafka's current replication model (with its single leader and several followers) is somewhat similar to the current consensus algorithms being used in databases (RAFT) with the major difference being the existence of the ISR. Consequently, Kafka suffers from the same fault tolerance issues as does other distributed systems which rely on RAFT: the leader tends to be the chokepoint for failures i.e. if it goes down, it will have a brief stop-the-world effect. In contrast, giving all replicas the power to write and read to other replicas is also difficult to accomplish (as emphasized by the complexity of the Egalitarian Paxos algorithm), since consistency is so hard to maintain in such an algorithm, plus very little gain compared to the overhead. Therefore, I propose that we have an intermediate step in between these two algorithms, and that is the leader partition quorum. was: Note: Description still not finished. Still not sure if this is needed. This feature I'm proposing might not offer too much of a performance boost, but I think it is still worth considering. In our current replication model, we have a single leader and several followers (with our ISR included). However, the current bottleneck would be that once the leader goes down, it will take a while to get the next leader online, which is a serious pain. (also leading to a considerable write/read delay) In order to help alleviate this issue, we can consider multiple clusters independent of each other i.e. each of them are their own leader/follower group for the _same partition set_. The difference here is that these clusters can _communicate_ between one another. At first, this might seem redundant, but there is a reasoning to this: # Let's say we have two leader/follower groups (I must note that these two groups does _not_ have shared memory) for the same replicated partition. # One leader goes down, and that means for the respective followers, they would under normal circumstances be unable to receive new write updates. # However, in this situation, we can have those followers poll their write/read requests from the other group whose leader has _not gone down._ It doesn't necessarily have to be the leader either, it can be other members from that group's ISR. # The idea here is that if the members of these two groups detect that they are lagging behind another, they would be able to poll one another for updates. So what is the difference here from just having multiple leaders in a single cluster? The answer is that the leader is responsible for making sure that there is consistency within _its own cluster._ Not the other cluster it is in communication with. > Consider addition of leader partition quorum > > > Key: KAFKA-9733 > URL: https://issues.apache.org/jira/browse/KAFKA-9733 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Richard Yu >Priority: Minor > > Note: Description still not finished. Still not sure if this is needed. > Kafka's current replication model (with its single leader and several > followers) is somewhat similar to the current consensus algorithms being used > in databases (RAFT) with the major difference being the existence of the ISR. > Consequently, Kafka suffers from the same fault tolerance issues as does > other distributed systems which rely on RAFT: the leader tends to be the > chokepoint for failures i.e. if it goes down, it will have a brief > stop-the-world effect. > In contrast, giving all replicas the power to write and read to other > replicas is also difficult to accomplish (as emphasized by the complexity of > the Egalitarian Paxos algorithm), since consistency is so hard to maintain in > such an algorithm, plus very little gain compared to the overhead. > Therefore, I propose that we have an intermediate step in between these two > algorithms, and that is the leader partition quorum. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9733) Consider addition of leader partition quorum
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Summary: Consider addition of leader partition quorum (was: Consider addition to Kafka's replication model) > Consider addition of leader partition quorum > > > Key: KAFKA-9733 > URL: https://issues.apache.org/jira/browse/KAFKA-9733 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Richard Yu >Priority: Minor > > Note: Description still not finished. Still not sure if this is needed. > This feature I'm proposing might not offer too much of a performance boost, > but I think it is still worth considering. In our current replication model, > we have a single leader and several followers (with our ISR included). > However, the current bottleneck would be that once the leader goes down, it > will take a while to get the next leader online, which is a serious pain. > (also leading to a considerable write/read delay) > In order to help alleviate this issue, we can consider multiple clusters > independent of each other i.e. each of them are their own leader/follower > group for the _same partition set_. The difference here is that these > clusters can _communicate_ between one another. > At first, this might seem redundant, but there is a reasoning to this: > # Let's say we have two leader/follower groups (I must note that these two > groups does _not_ have shared memory) for the same replicated partition. > # One leader goes down, and that means for the respective followers, they > would under normal circumstances be unable to receive new write updates. > # However, in this situation, we can have those followers poll their > write/read requests from the other group whose leader has _not gone down._ It > doesn't necessarily have to be the leader either, it can be other members > from that group's ISR. > # The idea here is that if the members of these two groups detect that they > are lagging behind another, they would be able to poll one another for > updates. > So what is the difference here from just having multiple leaders in a single > cluster? > The answer is that the leader is responsible for making sure that there is > consistency within _its own cluster._ Not the other cluster it is in > communication with. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9733) Consider addition to Kafka's replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Note: Description still not finished. Still not sure if this is needed. This feature I'm proposing might not offer too much of a performance boost, but I think it is still worth considering. In our current replication model, we have a single leader and several followers (with our ISR included). However, the current bottleneck would be that once the leader goes down, it will take a while to get the next leader online, which is a serious pain. (also leading to a considerable write/read delay) In order to help alleviate this issue, we can consider multiple clusters independent of each other i.e. each of them are their own leader/follower group for the _same partition set_. The difference here is that these clusters can _communicate_ between one another. At first, this might seem redundant, but there is a reasoning to this: # Let's say we have two leader/follower groups (I must note that these two groups does _not_ have shared memory) for the same replicated partition. # One leader goes down, and that means for the respective followers, they would under normal circumstances be unable to receive new write updates. # However, in this situation, we can have those followers poll their write/read requests from the other group whose leader has _not gone down._ It doesn't necessarily have to be the leader either, it can be other members from that group's ISR. # The idea here is that if the members of these two groups detect that they are lagging behind another, they would be able to poll one another for updates. So what is the difference here from just having multiple leaders in a single cluster? The answer is that the leader is responsible for making sure that there is consistency within _its own cluster._ Not the other cluster it is in communication with. was: Note: Description still not finished. This feature I'm proposing might not offer too much of a performance boost, but I think it is still worth considering. In our current replication model, we have a single leader and several followers (with our ISR included). However, the current bottleneck would be that once the leader goes down, it will take a while to get the next leader online, which is a serious pain. (also leading to a considerable write/read delay) In order to help alleviate this issue, we can consider multiple clusters independent of each other i.e. each of them are their own leader/follower group for the _same partition set_. The difference here is that these clusters can _communicate_ between one another. At first, this might seem redundant, but there is a reasoning to this: # Let's say we have two leader/follower groups (I must note that these two groups does _not_ have shared memory) for the same replicated partition. # One leader goes down, and that means for the respective followers, they would under normal circumstances be unable to receive new write updates. # However, in this situation, we can have those followers poll their write/read requests from the other group whose leader has _not gone down._ It doesn't necessarily have to be the leader either, it can be other members from that group's ISR. # The idea here is that if the members of these two groups detect that they are lagging behind another, they would be able to poll one another for updates. So what is the difference here from just having multiple leaders in a single cluster? The answer is that the leader is responsible for making sure that there is consistency within _its own cluster._ Not the other cluster it is in communication with. > Consider addition to Kafka's replication model > -- > > Key: KAFKA-9733 > URL: https://issues.apache.org/jira/browse/KAFKA-9733 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Richard Yu >Priority: Minor > > Note: Description still not finished. Still not sure if this is needed. > This feature I'm proposing might not offer too much of a performance boost, > but I think it is still worth considering. In our current replication model, > we have a single leader and several followers (with our ISR included). > However, the current bottleneck would be that once the leader goes down, it > will take a while to get the next leader online, which is a serious pain. > (also leading to a considerable write/read delay) > In order to help alleviate this issue, we can consider multiple clusters > independent of each other i.e. each of them are their own leader/follower > group for the _same partition set_. The difference here is that these > clusters can _communicate_ between one another. > At first, this might seem redund
[jira] [Updated] (KAFKA-9733) Consider addition to Kafka's replication model
[ https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9733: -- Description: Note: Description still not finished. This feature I'm proposing might not offer too much of a performance boost, but I think it is still worth considering. In our current replication model, we have a single leader and several followers (with our ISR included). However, the current bottleneck would be that once the leader goes down, it will take a while to get the next leader online, which is a serious pain. (also leading to a considerable write/read delay) In order to help alleviate this issue, we can consider multiple clusters independent of each other i.e. each of them are their own leader/follower group for the _same partition set_. The difference here is that these clusters can _communicate_ between one another. At first, this might seem redundant, but there is a reasoning to this: # Let's say we have two leader/follower groups (I must note that these two groups does _not_ have shared memory) for the same replicated partition. # One leader goes down, and that means for the respective followers, they would under normal circumstances be unable to receive new write updates. # However, in this situation, we can have those followers poll their write/read requests from the other group whose leader has _not gone down._ It doesn't necessarily have to be the leader either, it can be other members from that group's ISR. # The idea here is that if the members of these two groups detect that they are lagging behind another, they would be able to poll one another for updates. So what is the difference here from just having multiple leaders in a single cluster? The answer is that the leader is responsible for making sure that there is consistency within _its own cluster._ Not the other cluster it is in communication with. was: Note: Description still not finished. This feature I'm proposing might not offer too much of a performance boost, but I think it is still worth considering. In our current replication model, we have a single leader and several followers (with our ISR included). However, the current bottleneck would be that once the leader goes down, it will take a while to get the next leader online, which is a serious pain. (also leading to a considerable write/read delay) In order to help alleviate this issue, we can consider multiple clusters independent of each other i.e. each of them are their own leader/follower group for the _same partition set_. The difference here is that these clusters can _communicate_ between one another. At first, this might seem redundant, but there is a reasoning to this: # Let's say we have two leader/follower groups for the same replicated partition. # One leader goes down, and that means for the respective followers, they would under normal circumstances be unable to receive new write updates. # However, in this situation, we can have those followers poll their write/read requests from the other group whose leader has _not gone down._ It doesn't necessarily have to be the leader either, it can be other members from that group's ISR. # The idea here is that if the members of these two groups detect that they are lagging behind another, they would be able to poll one another for updates. So what is the difference here from just having multiple leaders in a single cluster? The answer is that the leader is responsible for making sure that there is consistency within _its own cluster._ Not the other cluster it is in communication with. > Consider addition to Kafka's replication model > -- > > Key: KAFKA-9733 > URL: https://issues.apache.org/jira/browse/KAFKA-9733 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Richard Yu >Priority: Minor > > Note: Description still not finished. > This feature I'm proposing might not offer too much of a performance boost, > but I think it is still worth considering. In our current replication model, > we have a single leader and several followers (with our ISR included). > However, the current bottleneck would be that once the leader goes down, it > will take a while to get the next leader online, which is a serious pain. > (also leading to a considerable write/read delay) > In order to help alleviate this issue, we can consider multiple clusters > independent of each other i.e. each of them are their own leader/follower > group for the _same partition set_. The difference here is that these > clusters can _communicate_ between one another. > At first, this might seem redundant, but there is a reasoning to this: > # Let's say we have two leader/follower groups (I must note that these two > groups does _n
[jira] [Created] (KAFKA-9733) Consider addition to Kafka's replication model
Richard Yu created KAFKA-9733: - Summary: Consider addition to Kafka's replication model Key: KAFKA-9733 URL: https://issues.apache.org/jira/browse/KAFKA-9733 Project: Kafka Issue Type: New Feature Components: clients, core Reporter: Richard Yu Note: Description still not finished. This feature I'm proposing might not offer too much of a performance boost, but I think it is still worth considering. In our current replication model, we have a single leader and several followers (with our ISR included). However, the current bottleneck would be that once the leader goes down, it will take a while to get the next leader online, which is a serious pain. (also leading to a considerable write/read delay) In order to help alleviate this issue, we can consider multiple clusters independent of each other i.e. each of them are their own leader/follower group for the _same partition set_. The difference here is that these clusters can _communicate_ between one another. At first, this might seem redundant, but there is a reasoning to this: # Let's say we have two leader/follower groups for the same replicated partition. # One leader goes down, and that means for the respective followers, they would under normal circumstances be unable to receive new write updates. # However, in this situation, we can have those followers poll their write/read requests from the other group whose leader has _not gone down._ It doesn't necessarily have to be the leader either, it can be other members from that group's ISR. # The idea here is that if the members of these two groups detect that they are lagging behind another, they would be able to poll one another for updates. So what is the difference here from just having multiple leaders in a single cluster? The answer is that the leader is responsible for making sure that there is consistency within _its own cluster._ Not the other cluster it is in communication with. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013370#comment-17013370 ] Richard Yu edited comment on KAFKA-8770 at 1/11/20 4:23 AM: I have created a draft KIP for this JIRA. [~xmar] [~mjsax] [~vvcephei] Input would be greatly appreciated! Right now, I've not formalized any API additions / configuration changes. It would be good first if we can get some discussion on what is needed and what is not! [https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] was (Author: yohan123): I have created a draft KIP for this JIRA. [~xmar] [~mjsax] [~vvcephei] Input would be greatly appreciated! Right now, I've not formalized any API additions / configuration changes. It would be good first if we can get some discussion on what is needed and what is not! [https://cwiki.apache.org/confluence/display/KAFKA/KIP-NUM%3A+Add+emit+on+change+support+for+Kafka+Streams#KIP-NUM:AddemitonchangesupportforKafkaStreams-DetailsonCoreImprovement] > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013370#comment-17013370 ] Richard Yu commented on KAFKA-8770: --- I have created a draft KIP for this JIRA. [~xmar] [~mjsax] [~vvcephei] Input would be greatly appreciated! Right now, I've not formalized any API additions / configuration changes. It would be good first if we can get some discussion on what is needed and what is not! [https://cwiki.apache.org/confluence/display/KAFKA/KIP-NUM%3A+Add+emit+on+change+support+for+Kafka+Streams#KIP-NUM:AddemitonchangesupportforKafkaStreams-DetailsonCoreImprovement] > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013230#comment-17013230 ] Richard Yu edited comment on KAFKA-8770 at 1/10/20 10:09 PM: - [~vvcephei] Actually, I did think of something which might be very useful as a performance enhancement. As mentioned in the JIRA description, Kafka Streams would load prior results and compare them to the original. However, that nonetheless has potential to be a severe hit to processing speed. I propose that instead of loading the prior results, we just get the hash code for that prior result instead. If there is a no op, the hash code of the prior result would be the same as the one that we have currently. However, if the result has _changed,_ then if the hash code function have been implemented correctly, the hash code would have changed correspondingly as well. Therefore, what should be done is the following: # We keep the hash codes of prior results in some store / whatever other device we might be able to use for storage. # Whenever we obtain a new processed result, retrieve corresponding prior hashcode to see if it had changed. # Update store / table as necessary if the hash code has changed. was (Author: yohan123): [~vvcephei] Actually, I did think of something which might be very useful as a performance enhancement. As mentioned in the JIRA description, Kafka Streams would load prior results and compare them to the original. However, that nonetheless has potential to be a severe hit to processing speed. I propose that instead of loading the prior results, we just get the hash code for that prior result instead. If there is a no op, the hash code of the prior result would be the same as the one that we have currently. However, if the result has _changed,_ then if the hash code function have been implemented correctly, the hash code would have changed correspondingly as well. Therefore, what should be done is the following: # We keep the hash codes of prior results in some store / whatever other device we might be able to use for storage. # Whenever we obtain a new processed result, retrieve corresponding prior hashcode to see if it had changed. # Update store / table as necessary if the hash code has changed. > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013230#comment-17013230 ] Richard Yu commented on KAFKA-8770: --- [~vvcephei] Actually, I did think of something which might be very useful as a performance enhancement. As mentioned in the JIRA description, Kafka Streams would load prior results and compare them to the original. However, that nonetheless has potential to be a severe hit to processing speed. I propose that instead of loading the prior results, we just get the hash code for that prior result instead. If there is a no op, the hash code of the prior result would be the same as the one that we have currently. However, if the result has _changed,_ then if the hash code function have been implemented correctly, the hash code would have changed correspondingly as well. Therefore, what should be done is the following: # We keep the hash codes of prior results in some store / whatever other device we might be able to use for storage. # Whenever we obtain a new processed result, retrieve corresponding prior hashcode to see if it had changed. # Update store / table as necessary if the hash code has changed. > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007860#comment-17007860 ] Richard Yu commented on KAFKA-8770: --- [~vvcephei] It seems that loading all the prior results would be wasteful. From what I could tell, it seems that we can pursue a caching strategy with this one i.e. use {{CachingKeyValueStore}} or the like if we want to implement this feature. > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
[ https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9285: -- Comment: was deleted (was: It looks like I didn't do my research. Looks like this is already fixed by Kafka Connect.) > Implement failed message topic to account for processing lag during failure > --- > > Key: KAFKA-9285 > URL: https://issues.apache.org/jira/browse/KAFKA-9285 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Major > Labels: kip > > Presently, in current Kafka failure schematics, when a consumer crashes, the > user is typically responsible for both detecting as well as restarting the > failed consumer. Therefore, during this period of time, when the consumer is > dead, it would result in a period of inactivity where no records are > consumed, hence lag results. Previously, there has been attempts to resolve > this problem: when failure is detected by broker, a substitute consumer will > be started (the so-called [Rebalance > Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) > which will continue processing records in Kafka's stead. > However, this has complications, as records will only be stored locally, and > in case of this consumer failing as well, that data will be lost. Instead, we > need to consider how we can still process these records and at the same time > effectively _persist_ them. It is here that I propose the concept of a > _failed message topic._ At a high level, it works like this. When we find > that a consumer has failed, messages which was originally meant to be sent to > that consumer would be redirected to this failed messaged topic. The user can > choose to assign consumers to this topic, which would consume messages (that > would've originally been processed by the failed consumers) from it. > Naturally, records from different topics can not go into the same failed > message topic, since we cannot tell which records belong to which consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
[ https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu resolved KAFKA-9285. --- Resolution: Fixed Already resolved by Kafka Connect. > Implement failed message topic to account for processing lag during failure > --- > > Key: KAFKA-9285 > URL: https://issues.apache.org/jira/browse/KAFKA-9285 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Major > Labels: kip > > Presently, in current Kafka failure schematics, when a consumer crashes, the > user is typically responsible for both detecting as well as restarting the > failed consumer. Therefore, during this period of time, when the consumer is > dead, it would result in a period of inactivity where no records are > consumed, hence lag results. Previously, there has been attempts to resolve > this problem: when failure is detected by broker, a substitute consumer will > be started (the so-called [Rebalance > Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) > which will continue processing records in Kafka's stead. > However, this has complications, as records will only be stored locally, and > in case of this consumer failing as well, that data will be lost. Instead, we > need to consider how we can still process these records and at the same time > effectively _persist_ them. It is here that I propose the concept of a > _failed message topic._ At a high level, it works like this. When we find > that a consumer has failed, messages which was originally meant to be sent to > that consumer would be redirected to this failed messaged topic. The user can > choose to assign consumers to this topic, which would consume messages (that > would've originally been processed by the failed consumers) from it. > Naturally, records from different topics can not go into the same failed > message topic, since we cannot tell which records belong to which consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
[ https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990705#comment-16990705 ] Richard Yu commented on KAFKA-9285: --- It looks like I didn't do my research. Looks like this is already fixed by Kafka Connect. > Implement failed message topic to account for processing lag during failure > --- > > Key: KAFKA-9285 > URL: https://issues.apache.org/jira/browse/KAFKA-9285 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Major > Labels: kip > > Presently, in current Kafka failure schematics, when a consumer crashes, the > user is typically responsible for both detecting as well as restarting the > failed consumer. Therefore, during this period of time, when the consumer is > dead, it would result in a period of inactivity where no records are > consumed, hence lag results. Previously, there has been attempts to resolve > this problem: when failure is detected by broker, a substitute consumer will > be started (the so-called [Rebalance > Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) > which will continue processing records in Kafka's stead. > However, this has complications, as records will only be stored locally, and > in case of this consumer failing as well, that data will be lost. Instead, we > need to consider how we can still process these records and at the same time > effectively _persist_ them. It is here that I propose the concept of a > _failed message topic._ At a high level, it works like this. When we find > that a consumer has failed, messages which was originally meant to be sent to > that consumer would be redirected to this failed messaged topic. The user can > choose to assign consumers to this topic, which would consume messages (that > would've originally been processed by the failed consumers) from it. > Naturally, records from different topics can not go into the same failed > message topic, since we cannot tell which records belong to which consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
[ https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9285: -- Labels: kip (was: ) > Implement failed message topic to account for processing lag during failure > --- > > Key: KAFKA-9285 > URL: https://issues.apache.org/jira/browse/KAFKA-9285 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Major > Labels: kip > > Presently, in current Kafka failure schematics, when a consumer crashes, the > user is typically responsible for both detecting as well as restarting the > failed consumer. Therefore, during this period of time, when the consumer is > dead, it would result in a period of inactivity where no records are > consumed, hence lag results. Previously, there has been attempts to resolve > this problem: when failure is detected by broker, a substitute consumer will > be started (the so-called [Rebalance > Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) > which will continue processing records in Kafka's stead. > However, this has complications, as records will only be stored locally, and > in case of this consumer failing as well, that data will be lost. Instead, we > need to consider how we can still process these records and at the same time > effectively _persist_ them. It is here that I propose the concept of a > _failed message topic._ At a high level, it works like this. When we find > that a consumer has failed, messages which was originally meant to be sent to > that consumer would be redirected to this failed messaged topic. The user can > choose to assign consumers to this topic, which would consume messages (that > would've originally been processed by the failed consumers) from it. > Naturally, records from different topics can not go into the same failed > message topic, since we cannot tell which records belong to which consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
[ https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-9285: - Assignee: Richard Yu > Implement failed message topic to account for processing lag during failure > --- > > Key: KAFKA-9285 > URL: https://issues.apache.org/jira/browse/KAFKA-9285 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Major > > Presently, in current Kafka failure schematics, when a consumer crashes, the > user is typically responsible for both detecting as well as restarting the > failed consumer. Therefore, during this period of time, when the consumer is > dead, it would result in a period of inactivity where no records are > consumed, hence lag results. Previously, there has been attempts to resolve > this problem: when failure is detected by broker, a substitute consumer will > be started (the so-called [Rebalance > Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) > which will continue processing records in Kafka's stead. > However, this has complications, as records will only be stored locally, and > in case of this consumer failing as well, that data will be lost. Instead, we > need to consider how we can still process these records and at the same time > effectively _persist_ them. It is here that I propose the concept of a > _failed message topic._ At a high level, it works like this. When we find > that a consumer has failed, messages which was originally meant to be sent to > that consumer would be redirected to this failed messaged topic. The user can > choose to assign consumers to this topic, which would consume messages (that > would've originally been processed by the failed consumers) from it. > Naturally, records from different topics can not go into the same failed > message topic, since we cannot tell which records belong to which consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
[ https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-9285: -- Description: Presently, in current Kafka failure schematics, when a consumer crashes, the user is typically responsible for both detecting as well as restarting the failed consumer. Therefore, during this period of time, when the consumer is dead, it would result in a period of inactivity where no records are consumed, hence lag results. Previously, there has been attempts to resolve this problem: when failure is detected by broker, a substitute consumer will be started (the so-called [Rebalance Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) which will continue processing records in Kafka's stead. However, this has complications, as records will only be stored locally, and in case of this consumer failing as well, that data will be lost. Instead, we need to consider how we can still process these records and at the same time effectively _persist_ them. It is here that I propose the concept of a _failed message topic._ At a high level, it works like this. When we find that a consumer has failed, messages which was originally meant to be sent to that consumer would be redirected to this failed messaged topic. The user can choose to assign consumers to this topic, which would consume messages (that would've originally been processed by the failed consumers) from it. Naturally, records from different topics can not go into the same failed message topic, since we cannot tell which records belong to which consumer. was: Presently, in current Kafka failure schematics, when a consumer crashes, the user is typically responsible for both detecting as well as restarting the failed consumer. Therefore, during this period of time, when the consumer is dead, it would result in a period of inactivity where no records are consumed, hence lag results. Previously, there has been attempts to resolve this problem: when failure is detected by broker, a substitute consumer will be started (the so-called [Rebalance Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) which will continue processing records in Kafka's stead. However, this has complications, as records will only be stored locally, and in case of this consumer failing as well, that data will be lost. Instead, we need to consider how we can still process these records and at the same time effectively _persist_ them. It is here that I propose the concept of a _failed message topic._ At a high level, it works like this. When we find that a consumer has failed, messages which was originally meant to be sent to that consumer would be redirected to this failed messaged topic. The user can choose to assign consumers to this topic, which would consume messages from failed consumers while other consumer threads are down. Naturally, records from different topics can not go into the same failed message topic, since we cannot tell which records belong to which consumer. > Implement failed message topic to account for processing lag during failure > --- > > Key: KAFKA-9285 > URL: https://issues.apache.org/jira/browse/KAFKA-9285 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Priority: Major > > Presently, in current Kafka failure schematics, when a consumer crashes, the > user is typically responsible for both detecting as well as restarting the > failed consumer. Therefore, during this period of time, when the consumer is > dead, it would result in a period of inactivity where no records are > consumed, hence lag results. Previously, there has been attempts to resolve > this problem: when failure is detected by broker, a substitute consumer will > be started (the so-called [Rebalance > Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) > which will continue processing records in Kafka's stead. > However, this has complications, as records will only be stored locally, and > in case of this consumer failing as well, that data will be lost. Instead, we > need to consider how we can still process these records and at the same time > effectively _persist_ them. It is here that I propose the concept of a > _failed message topic._ At a high level, it works like this. When we find > that a consumer has failed, messages which was originally meant to be sent to > that consumer would be redirected to this failed messaged topic. The user can > choose to assign consumers to this topic, which would consume messages (that > would've originally been processed by the failed cons
[jira] [Commented] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
[ https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990303#comment-16990303 ] Richard Yu commented on KAFKA-9285: --- cc [~bchen225242] You might be interested in this. > Implement failed message topic to account for processing lag during failure > --- > > Key: KAFKA-9285 > URL: https://issues.apache.org/jira/browse/KAFKA-9285 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Priority: Major > > Presently, in current Kafka failure schematics, when a consumer crashes, the > user is typically responsible for both detecting as well as restarting the > failed consumer. Therefore, during this period of time, when the consumer is > dead, it would result in a period of inactivity where no records are > consumed, hence lag results. Previously, there has been attempts to resolve > this problem: when failure is detected by broker, a substitute consumer will > be started (the so-called [Rebalance > Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) > which will continue processing records in Kafka's stead. > However, this has complications, as records will only be stored locally, and > in case of this consumer failing as well, that data will be lost. Instead, we > need to consider how we can still process these records and at the same time > effectively _persist_ them. It is here that I propose the concept of a > _failed message topic._ At a high level, it works like this. When we find > that a consumer has failed, messages which was originally meant to be sent to > that consumer would be redirected to this failed messaged topic. The user can > choose to assign consumers to this topic, which would consume messages from > failed consumers while other consumer threads are down. > Naturally, records from different topics can not go into the same failed > message topic, since we cannot tell which records belong to which consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
Richard Yu created KAFKA-9285: - Summary: Implement failed message topic to account for processing lag during failure Key: KAFKA-9285 URL: https://issues.apache.org/jira/browse/KAFKA-9285 Project: Kafka Issue Type: New Feature Components: consumer Reporter: Richard Yu Presently, in current Kafka failure schematics, when a consumer crashes, the user is typically responsible for both detecting as well as restarting the failed consumer. Therefore, during this period of time, when the consumer is dead, it would result in a period of inactivity where no records are consumed, hence lag results. Previously, there has been attempts to resolve this problem: when failure is detected by broker, a substitute consumer will be started (the so-called [Rebalance Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) which will continue processing records in Kafka's stead. However, this has complications, as records will only be stored locally, and in case of this consumer failing as well, that data will be lost. Instead, we need to consider how we can still process these records and at the same time effectively _persist_ them. It is here that I propose the concept of a _failed message topic._ At a high level, it works like this. When we find that a consumer has failed, messages which was originally meant to be sent to that consumer would be redirected to this failed messaged topic. The user can choose to assign consumers to this topic, which would consume messages from failed consumers while other consumer threads are down. Naturally, records from different topics can not go into the same failed message topic, since we cannot tell which records belong to which consumer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16985408#comment-16985408 ] Richard Yu commented on KAFKA-8769: --- [~vvcephei] [~mjsax] I'm not sure if you read the KIP, but on it, its explicitly stated that the user is allowed control over which system of tracking they want. If they don't want per key stream time tracking, they can disable it (or enable it if they wish, we can have our default policy remain with per partition tracking). Seems like some users still have problems with per partition tracking anyways, so I'd imagine that this would be a welcome fix for them. There are workarounds which avoid making the low traffic problem worse, as I mentioned in earlier discussion, but I don't know if that is acceptable for you. > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16971888#comment-16971888 ] Richard Yu commented on KAFKA-8769: --- [~vvcephei] [~mjsax] [~bbejeck] and anyone else who wishes to have some input, I made a KIP-540 for this issue. I just want some discussion on how we would go about implementing this system. Below is the link [https://cwiki.apache.org/confluence/display/KAFKA/KIP-540%3A+Implement+per+key+stream+time+tracking] > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967926#comment-16967926 ] Richard Yu commented on KAFKA-8769: --- [~vvcephei] Do you have any ideas on advancing stream time? It seems unnatural to do it artificially, so I wonder if we can just move ahead with the per key stream time part of the issue, since that seems to be the meat of it. > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965404#comment-16965404 ] Richard Yu commented on KAFKA-8769: --- Adding on to how KIP-539 is different from KIP-424. KIP-424's sole intention was to add a suppress() operator which can flush on wall clock time. On the other hand, KIP-539's original intention was not to add this method, but just to flush out records which had stayed for too long in the suppression buffer (again based on wall clock time). > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965402#comment-16965402 ] Richard Yu commented on KAFKA-8769: --- Hi [~mjsax] I wasn't aware of the suppress-contract really, so it hadn't occurred to me that suppressing based on wall-clock time would've been a bad idea. [~vvcephei] did indicate to me that KIP-424 had some overlapping use cases with this KIP. So I thought it would be convenient to introduce both of them at once. Although, it is clear from your opinion that you don't believe flushing() based on wall-clock time would be ideal. In this case, should we just proceed with implementing per key stream time tracking then first? Because the issue [~vvcephei] believed needed to be solved was first getting rid of the records that are stuck in the buffers somehow, but really, they can be only evicted on two time systems: stream time or wall clock. The second option is out. While for the first, how would we realistically advance stream time? Do we artificially jncrease it somehow? That doesn't seem right, since records sent by the user is what determines it. So I'm at a loss to how we should fix this low traffic suppression issue. [~vvcephei] Your thoughts? I do have a solution which can avoid making the low traffic suppression problem worse, and that is to maintain the old behavior of eviction. That is: in per key stream time tracking, it was assumed that we evict on _per key_ stream time, in which case, it will make the problem worse. However, if we evict them still on _per partition_ stream time, the logic remains the same, and the problem does not become any worse. In which case, we can use the finer granularity of per key stream time tracking for the calculation of grace periods, but not for suppression buffers. [~mjsax] how does this sound? > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time an
[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961615#comment-16961615 ] Richard Yu edited comment on KAFKA-8769 at 10/29/19 2:39 AM: - [~vvcephei] [~mjsax] Bumping the KIP. Want to see if we can get the KIP in. was (Author: yohan123): Bumping the KIP. Want to see if we can get the KIP in. > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961615#comment-16961615 ] Richard Yu commented on KAFKA-8769: --- Bumping the KIP. Want to see if we can get the KIP in. > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959308#comment-16959308 ] Richard Yu commented on KAFKA-8522: --- Found where the tombstones are inserted. That really just answered my question above. My bad. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958388#comment-16958388 ] Richard Yu edited comment on KAFKA-8522 at 10/23/19 11:59 PM: -- Hi [~junrao] [~hachikuji] Just have a question involving implementation of the KIP. Where is the base timestamp for the RecordBatch (batch header v2) assigned its value? I'm asking because I'm having a bit of trouble locating where the base timestamp's value is assigned. was (Author: yohan123): Hi [~junrao] [~hachikuji] Just have a question involving implementation of the KIP. Where is the base timestamp for the RecordBatch (batch header v2) defined in Kafka? I'm asking because I'm having a bit of trouble locating where the base timestamp's value is assigned. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958388#comment-16958388 ] Richard Yu commented on KAFKA-8522: --- Hi [~junrao] [~hachikuji] Just have a question involving implementation of the KIP. Where is the base timestamp for the RecordBatch (batch header v2) defined in Kafka? I'm asking because I'm having a bit of trouble locating where the base timestamp's value is assigned. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958182#comment-16958182 ] Richard Yu commented on KAFKA-8769: --- Hi all, I have updated the KIP to include an extra feature as well due to overlapping usecases. It would be great if we can get some input. :) > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8769: -- Comment: was deleted (was: [~vvcephei] Just a thought. Would the low traffic problem be made significantly worse by per key stream time tracking? I think that it would only be marginally worse at best than the current situation we have now. What we could do to get around this problem is that the logic in the suppression buffers stays the same (as it did before so that the problem does not get worse). What this means is that, yes, each individual key will have a different stream time. But when we evict records, we evict them based on the maximum of stream times _of_ all keys. This means that the low traffic problem is not made worse, it remains the same. I think this is acceptable. WDYT?) > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955611#comment-16955611 ] Richard Yu commented on KAFKA-8769: --- Alright, so I made a KIP for the low traffic problem here: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-539%3A+Implement+mechanism+to+flush+out+records+in+low+traffic+suppression+buffers] Hope this helps! :) > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955053#comment-16955053 ] Richard Yu commented on KAFKA-8769: --- [~vvcephei] Just a thought. Would the low traffic problem be made significantly worse by per key stream time tracking? I think that it would only be marginally worse at best than the current situation we have now. What we could do to get around this problem is that the logic in the suppression buffers stays the same (as it did before so that the problem does not get worse). What this means is that, yes, each individual key will have a different stream time. But when we evict records, we evict them based on the maximum of stream times _of_ all keys. This means that the low traffic problem is not made worse, it remains the same. I think this is acceptable. WDYT? > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8769: -- Labels: needs-discussion needs-kip (was: ) > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16919901#comment-16919901 ] Richard Yu commented on KAFKA-8522: --- Oh, alright. I will see how things go and create a KIP then. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918980#comment-16918980 ] Richard Yu edited comment on KAFKA-8522 at 8/29/19 10:06 PM: - [~junrao] I think I've hit a caveat with your approach. The problem I've encountered here is that the partitions that are "assigned" to a LogCleaner could fluctuate after the LogCleaner instance is constructed. This has some implications because new TopicPartitions could be added to or removed from this "assignment". The consequences are that files are created and removed far more often than comfortable under certain conditions (not completely sure here). For details, I noticed that in LogCleanerManager constructor, the {{logs}} parameter (the equivalent of the "assignment") is essentially a ConcurrentMap which can have its contents change after initialization. That means files also have to be repeatedly created and destroyed. Your thoughts on this? was (Author: yohan123): [~junrao] I think I've hit a caveat with your approach. The problem I've encountered here is that the partitions that are "assigned" to a LogCleaner could fluctuate after the LogCleaner instance is constructed. This has some implications because new TopicPartitions could be added to or removed from this "assignment". The consequences are that files are created and removed far more often than comfortable under certain conditions. For details, I noticed that in LogCleanerManager constructor, the {{logs}} parameter (the equivalent of the "assignment") is essentially a ConcurrentMap which can have its contents change after initialization. That means files also have to be repeatedly created and destroyed. Your thoughts on this? > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918985#comment-16918985 ] Richard Yu commented on KAFKA-8522: --- I've some thoughts to get around this. (the best approach I can think of is once a topic partition and its respective checkpoint file is created, we don't remove the entry from the partition to checkpoint map). Once the LogCleanerManager instance is destroyed (with the exception of when {{partitions}} are having their checkpoint files being moved from one directory to the other), then we remove all files. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918980#comment-16918980 ] Richard Yu edited comment on KAFKA-8522 at 8/29/19 10:01 PM: - [~junrao] I think I've hit a caveat with your approach. The problem I've encountered here is that the partitions that are "assigned" to a LogCleaner could fluctuate after the LogCleaner instance is constructed. This has some implications because new TopicPartitions could be added to or removed from this "assignment". The consequences are that files are created and removed far more often than comfortable under certain conditions. For details, I noticed that in LogCleanerManager constructor, the {{logs}} parameter (the equivalent of the "assignment") is essentially a ConcurrentMap which can have its contents change after initialization. That means files also have to be repeatedly created and destroyed. Your thoughts on this? was (Author: yohan123): @Jun Rao I think I've hit a caveat with your approach. The problem I've encountered here is that the partitions that are "assigned" to a LogCleaner could fluctuate after the LogCleaner instance is constructed. This has some implications because new TopicPartitions could be added to or removed from this "assignment". The consequences are that files are created and removed far more often than comfortable under certain conditions. For details, I noticed that in LogCleanerManager constructor, the {{logs}} parameter (the equivalent of the "assignment") is essentially a ConcurrentMap which can have its contents change after initialization. That means files also have to be repeatedly created and destroyed. Your thoughts on this? > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918980#comment-16918980 ] Richard Yu commented on KAFKA-8522: --- @Jun Rao I think I've hit a caveat with your approach. The problem I've encountered here is that the partitions that are "assigned" to a LogCleaner could fluctuate after the LogCleaner instance is constructed. This has some implications because new TopicPartitions could be added to or removed from this "assignment". The consequences are that files are created and removed far more often than comfortable under certain conditions. For details, I noticed that in LogCleanerManager constructor, the {{logs}} parameter (the equivalent of the "assignment") is essentially a ConcurrentMap which can have its contents change after initialization. That means files also have to be repeatedly created and destroyed. Your thoughts on this? > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911942#comment-16911942 ] Richard Yu commented on KAFKA-8522: --- Alright, sounds good. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909878#comment-16909878 ] Richard Yu commented on KAFKA-8522: --- So in summary, should I interpret your suggestion as creating multiple files per logDir (logDir corresponds to a single disk)? In that case, for each dir (or directory), multiple files will be created. I guess that was probably what you had in mind. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909876#comment-16909876 ] Richard Yu edited comment on KAFKA-8522 at 8/18/19 3:22 AM: The below code is located in LogManager.scala. I noticed that in LogManager this was the following code that was used for the LogManager constructor: {code:java} new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), ...) {code} The variable {{config}} is a KafkaConfig instance. The {{logDirs}} you see here will be the ones that will eventually be used in {{LogCleanerManager}}. This was the place from which I drew the conclusion in my previous comment. was (Author: yohan123): The below code is located in LogManager.scala. I noticed that in LogManager this was the following code that was used for the LogManager constructor: {code:java} new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile), topicConfigs = topicConfigs, initialDefaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir, flushCheckMs = config.logFlushSchedulerIntervalMs, flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, maxPidExpirationMs = config.transactionIdExpirationMs, scheduler = kafkaScheduler, brokerState = brokerState, brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, time = time) {code} The variable {{config}} is a KafkaConfig instance. The {{logDirs}} you see here will be the ones that will eventually be used in {{LogCleanerManager}}. This was the place from which I drew the conclusion in my previous comment. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909876#comment-16909876 ] Richard Yu commented on KAFKA-8522: --- The below code is located in LogManager.scala. I noticed that in LogManager this was the following code that was used for the LogManager constructor: {code:java} new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile), topicConfigs = topicConfigs, initialDefaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir, flushCheckMs = config.logFlushSchedulerIntervalMs, flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, maxPidExpirationMs = config.transactionIdExpirationMs, scheduler = kafkaScheduler, brokerState = brokerState, brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, time = time) {code} The variable {{config}} is a KafkaConfig instance. The {{logDirs}} you see here will be the ones that will eventually be used in {{LogCleanerManager}}. This was the place from which I drew the conclusion in my previous comment. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909866#comment-16909866 ] Richard Yu edited comment on KAFKA-8522 at 8/18/19 3:08 AM: [~junrao] Just want some clarifications on something. When {{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was given a sequence of files the program can write into. These files' classpaths were given by the user in the {{logDirs}} property located in {{KafkaConfig}}. It was here that I got a little confused. Firstly, I assumed that the files was limited in number by the user to be per disk (since KafkaConfig's values should be controlled by the user). This is where I think there could be a problem. In a real world situation, the number of partitions typically exceeds the number of disks available, so in other words, that would mean that if we pair off one checkpoint file per partition, there would be some partitions that would have no checkpoint files to write to. Is my understanding of the situation correct? I am not completely sure about this, but this was how it appeared to me. was (Author: yohan123): [~junrao] Just want some clarifications on something. When {{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was given (a sequence of {{file}}s{{) that were used to determine the checkpoint files a person can write into. After some research, I discovered that these files' classpaths was given by KafkaConfig's {{logDirs}} property. It was here that I got a little confused. Firstly, I assumed that the files was limited in number by the user to be per disk (since KafkaConfig's values should be controlled by the user). This is where I think there could be a problem. In a real world situation, the number of partitions typically exceeds the number of disks available, so in other words, that would mean that if we pair off one checkpoint file per partition, there would be some partitions that would have no checkpoint files to write to. Is my understanding of the situation correct? I am not completely sure about this, but this was how it appeared to me. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8770: -- Labels: needs-kip (was: ) > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909866#comment-16909866 ] Richard Yu edited comment on KAFKA-8522 at 8/18/19 2:18 AM: [~junrao] Just want some clarifications on something. When {{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was given (a sequence of {{file}}s{{) that were used to determine the checkpoint files a person can write into. After some research, I discovered that these files' classpaths was given by KafkaConfig's {{logDirs}} property. It was here that I got a little confused. Firstly, I assumed that the files was limited in number by the user to be per disk (since KafkaConfig's values should be controlled by the user). This is where I think there could be a problem. In a real world situation, the number of partitions typically exceeds the number of disks available, so in other words, that would mean that if we pair off one checkpoint file per partition, there would be some partitions that would have no checkpoint files to write to. Is my understanding of the situation correct? I am not completely sure about this, but this was how it appeared to me. was (Author: yohan123): [~junrao] Just want some clarifications on something. When {{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was given (a sequence of {{File}}s) that were used to determine the checkpoint files a person can write into. After some research, I discovered that these files' classpaths was given by KafkaConfig's {{logDirs}} property. It was here that I got a little confused. Firstly, I assumed that the files was limited in number by the user to be per disk (since KafkaConfig's values should be controlled by the user). This is where I think there could be a problem. In a real world situation, the number of partitions typically exceeds the number of disks available, so in other words, that would mean that if we pair off one checkpoint file per partition, there would be some partitions that would have no checkpoint files to write to. Is my understanding of the situation correct? I am not completely sure about this, but this was how it appeared to me. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909866#comment-16909866 ] Richard Yu commented on KAFKA-8522: --- [~junrao] Just want some clarifications on something. When {{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was given (a sequence of {{File}}s) that were used to determine the checkpoint files a person can write into. After some research, I discovered that these files' classpaths was given by KafkaConfig's {{logDirs}} property. It was here that I got a little confused. Firstly, I assumed that the files was limited in number by the user to be per disk (since KafkaConfig's values should be controlled by the user). This is where I think there could be a problem. In a real world situation, the number of partitions typically exceeds the number of disks available, so in other words, that would mean that if we pair off one checkpoint file per partition, there would be some partitions that would have no checkpoint files to write to. Is my understanding of the situation correct? I am not completely sure about this, but this was how it appeared to me. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8806) Kafka.poll spends significant amount of time in KafkaConsumer.updateAssignmentMetadataIfNeeded
[ https://issues.apache.org/jira/browse/KAFKA-8806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909445#comment-16909445 ] Richard Yu commented on KAFKA-8806: --- [~vvcephei] Since you wrote this part of the code, your thoughts on this? > Kafka.poll spends significant amount of time in > KafkaConsumer.updateAssignmentMetadataIfNeeded > -- > > Key: KAFKA-8806 > URL: https://issues.apache.org/jira/browse/KAFKA-8806 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Xavier Léauté >Assignee: David Arthur >Priority: Major > > Comparing the performance profile of 2.2.0 and 2.3.0, we are seeing > significant performance differences in the > {{KafkaConumer.updateAssignmentMetadataIfNeeded()}} method. > The call to {{KafkaConsumer.updateAssignmentMetadataIfNeeded()}} now > represents roughly 40% of CPU time spent in {{KafkaConsumer.poll()}}, when > before it only represented less than 2%. > Most of the extra time appears to be spent in > {{KafkaConsumer.validateOffsetsIfNeeded()}}, which did not show up in > previous profiles. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906692#comment-16906692 ] Richard Yu edited comment on KAFKA-8769 at 8/13/19 10:50 PM: - Notably, a problem with this approach is that keys which are evicted from the cache will no longer have their timestamps tracked. If the situation occurs when we receive a record whose key has no tracked timestamp, then we could insert the key with that record timestamp as that particular key's largest time into the cache. Admittedly, the order in which we receive records does not guarantee that we have a monotonically increasing timestamp, but the latest records we receive and its timestamp are fair approximations. was (Author: yohan123): Notably, a problem with this approach is that keys which are evicted from the cache will no longer have their timestamps tracked. If the situation occurs when we receive a record whose key has no tracked timestamp, then we could insert the key with that record timestamp as that particular key's partition time into the cache. Admittedly, the order in which we receive records does not guarantee that we have a monotonically increasing timestamp, but the latest records we receive and its timestamp are fair approximations. > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906692#comment-16906692 ] Richard Yu commented on KAFKA-8769: --- Notably, a problem with this approach is that keys which are evicted from the cache will no longer have their timestamps tracked. If the situation occurs when we receive a record whose key has no tracked timestamp, then we could insert the key with that record timestamp as that particular key's partition time into the cache. Admittedly, the order in which we receive records does not guarantee that we have a monotonically increasing timestamp, but the latest records we receive and its timestamp are fair approximations. > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906674#comment-16906674 ] Richard Yu edited comment on KAFKA-8769 at 8/13/19 10:29 PM: - [~vvcephei] On the issue regarding low traffic: would this mechanism necessarily be a public API and what could it look like? Just wondering if this requires a KIP. And about the scalability of the key-based partition time tracking. Just some thoughts at the moment. We don't have to store _all_ the keys and its timestamp. That really would be wasteful if at least a significant portion of them is not used often at a particular time. Instead, we could use some sort of cache (maybe LFU or LRU) which could store the most popular keys at a certain period in a StreamTask's lifetime. was (Author: yohan123): [~vvcephei] On the issue regarding low traffic: would this mechanism necessarily be a public API and what could it look like? Just wondering if this requires a KIP. And about the scalability of the key-based partition time tracking. Just some thoughts at the moment. We don't have to store _all_ the keys and its timestamp. That really would be wasteful if at least a significant portion of them is not used often at a particular time. Instead, we could use some sort of cache (maybe LFU or LRU) which could store the most popular keys at a certain period in time. > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906674#comment-16906674 ] Richard Yu commented on KAFKA-8769: --- [~vvcephei] On the issue regarding low traffic: would this mechanism necessarily be a public API and what could it look like? Just wondering if this requires a KIP. And about the scalability of the key-based partition time tracking. Just some thoughts at the moment. We don't have to store _all_ the keys and its timestamp. That really would be wasteful if at least a significant portion of them is not used often at a particular time. Instead, we could use some sort of cache (maybe LFU or LRU) which could store the most popular keys at a certain period in time. > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16905747#comment-16905747 ] Richard Yu commented on KAFKA-8522: --- [~jagsancio] any thoughts? > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16905377#comment-16905377 ] Richard Yu commented on KAFKA-8522: --- [~junrao] A thought just occurred to me. Wouldn't we need to provide an upgrade path if we wish to deprecate the old checkpoint file system? If so, then how would we go about implementing it? I'm not completely sure if we need one at the moment, but I'd think we do. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16904814#comment-16904814 ] Richard Yu commented on KAFKA-8522: --- Okay, solved the above problem. Will start with reorganizing the checkpoint files into separate ones for individual partitions. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16904804#comment-16904804 ] Richard Yu commented on KAFKA-8522: --- Alright, so I have found the necessary logic for the checkpoint files and the classes in which it is managed. (LogCleaner and LogManager). The current problem I have is where the cleaning offsets are committed to the checkpoint files. Still looking into it at the moment. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16902209#comment-16902209 ] Richard Yu commented on KAFKA-8522: --- [~junrao] You mentioned in a previous post that we should write to a checkpoint file. Should we create a new checkpoint file explicitly for storing these offsets and cleaning times? Or should it be embedded in a preexisting checkpoint file. > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned
[ https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16899331#comment-16899331 ] Richard Yu commented on KAFKA-4545: --- Sure, I will assign this issue to myself then. I will get around to this issue soon, just having a pending PR that needs to be merged with trunk. [~junrao] Thanks for the heads-up. We should probably add a needs-kip label. > tombstone needs to be removed after delete.retention.ms has passed after it > has been cleaned > > > Key: KAFKA-4545 > URL: https://issues.apache.org/jira/browse/KAFKA-4545 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Jun Rao >Assignee: Jose Armando Garcia Sancio >Priority: Major > > The algorithm for removing the tombstone in a compacted is supposed to be the > following. > 1. Tombstone is never removed when it's still in the dirty portion of the log. > 2. After the tombstone is in the cleaned portion of the log, we further delay > the removal of the tombstone by delete.retention.ms since the time the > tombstone is in the cleaned portion. > Once the tombstone is in the cleaned portion, we know there can't be any > message with the same key before the tombstone. Therefore, for any consumer, > if it reads a non-tombstone message before the tombstone, but can read to the > end of the log within delete.retention.ms, it's guaranteed to see the > tombstone. > However, the current implementation doesn't seem correct. We delay the > removal of the tombstone by delete.retention.ms since the last modified time > of the last cleaned segment. However, the last modified time is inherited > from the original segment, which could be arbitrarily old. So, the tombstone > may not be preserved as long as it needs to be. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned
[ https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-4545: -- Labels: needs-kip (was: ) > tombstone needs to be removed after delete.retention.ms has passed after it > has been cleaned > > > Key: KAFKA-4545 > URL: https://issues.apache.org/jira/browse/KAFKA-4545 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Jun Rao >Assignee: Richard Yu >Priority: Major > Labels: needs-kip > > The algorithm for removing the tombstone in a compacted is supposed to be the > following. > 1. Tombstone is never removed when it's still in the dirty portion of the log. > 2. After the tombstone is in the cleaned portion of the log, we further delay > the removal of the tombstone by delete.retention.ms since the time the > tombstone is in the cleaned portion. > Once the tombstone is in the cleaned portion, we know there can't be any > message with the same key before the tombstone. Therefore, for any consumer, > if it reads a non-tombstone message before the tombstone, but can read to the > end of the log within delete.retention.ms, it's guaranteed to see the > tombstone. > However, the current implementation doesn't seem correct. We delay the > removal of the tombstone by delete.retention.ms since the last modified time > of the last cleaned segment. However, the last modified time is inherited > from the original segment, which could be arbitrarily old. So, the tombstone > may not be preserved as long as it needs to be. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned
[ https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-4545: - Assignee: Richard Yu (was: Jose Armando Garcia Sancio) > tombstone needs to be removed after delete.retention.ms has passed after it > has been cleaned > > > Key: KAFKA-4545 > URL: https://issues.apache.org/jira/browse/KAFKA-4545 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Jun Rao >Assignee: Richard Yu >Priority: Major > > The algorithm for removing the tombstone in a compacted is supposed to be the > following. > 1. Tombstone is never removed when it's still in the dirty portion of the log. > 2. After the tombstone is in the cleaned portion of the log, we further delay > the removal of the tombstone by delete.retention.ms since the time the > tombstone is in the cleaned portion. > Once the tombstone is in the cleaned portion, we know there can't be any > message with the same key before the tombstone. Therefore, for any consumer, > if it reads a non-tombstone message before the tombstone, but can read to the > end of the log within delete.retention.ms, it's guaranteed to see the > tombstone. > However, the current implementation doesn't seem correct. We delay the > removal of the tombstone by delete.retention.ms since the last modified time > of the last cleaned segment. However, the last modified time is inherited > from the original segment, which could be arbitrarily old. So, the tombstone > may not be preserved as long as it needs to be. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned
[ https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16895651#comment-16895651 ] Richard Yu commented on KAFKA-4545: --- [~jagsancio] Are you still working on this? Not too sure because it looks like there doesn't appear to be much work on it since the ticket was created. > tombstone needs to be removed after delete.retention.ms has passed after it > has been cleaned > > > Key: KAFKA-4545 > URL: https://issues.apache.org/jira/browse/KAFKA-4545 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Jun Rao >Assignee: Jose Armando Garcia Sancio >Priority: Major > > The algorithm for removing the tombstone in a compacted is supposed to be the > following. > 1. Tombstone is never removed when it's still in the dirty portion of the log. > 2. After the tombstone is in the cleaned portion of the log, we further delay > the removal of the tombstone by delete.retention.ms since the time the > tombstone is in the cleaned portion. > Once the tombstone is in the cleaned portion, we know there can't be any > message with the same key before the tombstone. Therefore, for any consumer, > if it reads a non-tombstone message before the tombstone, but can read to the > end of the log within delete.retention.ms, it's guaranteed to see the > tombstone. > However, the current implementation doesn't seem correct. We delay the > removal of the tombstone by delete.retention.ms since the last modified time > of the last cleaned segment. However, the last modified time is inherited > from the original segment, which could be arbitrarily old. So, the tombstone > may not be preserved as long as it needs to be. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16886516#comment-16886516 ] Richard Yu commented on KAFKA-8522: --- [~junrao] Would you mind if tackle this issue? > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-7711) Add a bounded flush() API to Kafka Producer
[ https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885795#comment-16885795 ] Richard Yu edited comment on KAFKA-7711 at 7/16/19 3:01 AM: [~hachikuji] I think I've seen this issue come up multiple times in the past. Not sure exactly which issue numbers. Do you think this is a issue that really needs to be tackled? Edit: Got it somewhat mixed up. There had been issues with Kafka's send API in the past. For example, KAFKA-6705. But not the flush() API. It might be good to have something like it though. was (Author: yohan123): [~hachikuji] I think I've seen this issue come up multiple times in the past. Not sure exactly which issue numbers. Do you think this is a issue that really needs to be tackled? > Add a bounded flush() API to Kafka Producer > > > Key: KAFKA-7711 > URL: https://issues.apache.org/jira/browse/KAFKA-7711 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: kun du >Priority: Minor > Labels: needs-kip > > Currently the call to Producer.flush() can be hang there for indeterminate > time. > It is a good idea to add a bounded flush() API and timeout if producer is > unable to flush all the batch records in a limited time. In this way the > caller of flush() has a chance to decide what to do next instead of just wait > forever. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7711) Add a bounded flush() API to Kafka Producer
[ https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885795#comment-16885795 ] Richard Yu commented on KAFKA-7711: --- [~hachikuji] I think I've seen this issue come up multiple times in the past. Not sure exactly which issue numbers. Do you think this is a issue that really needs to be tackled? > Add a bounded flush() API to Kafka Producer > > > Key: KAFKA-7711 > URL: https://issues.apache.org/jira/browse/KAFKA-7711 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: kun du >Priority: Minor > Labels: needs-kip > > Currently the call to Producer.flush() can be hang there for indeterminate > time. > It is a good idea to add a bounded flush() API and timeout if producer is > unable to flush all the batch records in a limited time. In this way the > caller of flush() has a chance to decide what to do next instead of just wait > forever. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none
[ https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884826#comment-16884826 ] Richard Yu commented on KAFKA-8650: --- [~mjsax] Do you think this ticket is something that Kafka commuity would need within the next release? Would be good to get a gauge on this issue's priority. > Streams does not work as expected with auto.offset.reset=none > - > > Key: KAFKA-8650 > URL: https://issues.apache.org/jira/browse/KAFKA-8650 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Raman Gupta >Priority: Major > Labels: needs-kip > > The auto.offset.reset policy of none is useful as a safety measure, > especially when > * exactly-once processing is desired, or > * at-least-once is desired, but it is expensive to reprocess from the > beginning. > In this case, using "none" forces the ops team to explicitly set the offset > before the stream can re-start processing, in the (hopefully rare) situations > in which the stream consumer offset has been lost for some reason, or in the > case of a new stream that should not start processing from the beginning or > the end, but somewhere in the middle (this scenario might occur during topic > migrations). > Kafka streams really only supports auto.offset.reset of earliest or latest > (see the `Topology.AutoOffsetReset` enum). It is also possible to use the > auto.offset.reset configuration value, but this works suboptimally because if > the streams application reset tool is used (even with a specific offset > specified), the offset is set for the input topic, but it is not, and cannot > be, set for the internal topics, which won't exist yet. > The internal topics are created by Kafka streams at startup time, but because > the auto.offset.reset policy of "none" is passed to the consumer of those > internal topics, the Kafka stream fails to start with a > "NoOffsetForPartitionException". > Proposals / options: > 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it > affects the input topics, but not the internal topics. > 2) Allow streams to be configured with auto.offset.reset=none, but explicitly > set the offset to 0 for newly created internal topics. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8658) A way to configure the jmx rmi port
[ https://issues.apache.org/jira/browse/KAFKA-8658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884810#comment-16884810 ] Richard Yu commented on KAFKA-8658: --- Alright, so here is the PR for this issue. [https://github.com/apache/kafka/pull/7088] > A way to configure the jmx rmi port > --- > > Key: KAFKA-8658 > URL: https://issues.apache.org/jira/browse/KAFKA-8658 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 1.0.0 > Environment: Centos 7 >Reporter: Agostino Sarubbo >Priority: Minor > > Hello, > I'm on kafka-1.0.0 so I'm not sure if it is fixed in the current version. > Atm we are using the following in the service script to use JMX: > Environment=JMX_PORT=7666 > However there is no way to set the jmx_rmi_port. When there is no > specification for jmx_rmi_port the jvm assigns a random port. This > complicates the way we manage the firewall. > Would be great if there is a way to set the jmx_rmi_port in the same way, > e.g.: > Environment=JMX_RMI_PORT=7667 > The variable used during the jvm start is: > -Dcom.sun.management.jmxremote.rmi.port= -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8369) Generate an immutable Map view for generated messages with a map key
[ https://issues.apache.org/jira/browse/KAFKA-8369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879987#comment-16879987 ] Richard Yu commented on KAFKA-8369: --- Hi [~hachikuji], I've done some research into this issue and something has come up which requires some clarifications. As of the moment, the find() API in ImplicitLinkedHashCollection takes a {{key}} input argument. This {{key}} is used to search for an object in the set with the exact same hash code and that object is returned. So my question here is: what exactly is the key and value for the immutable map view? I ask this because it doesn't seem clear what key-value pairs exist. The closest I could come up with is the {{key}} input argument being the actual key and the elements stored in the set the values. Is that the way this issue is to be approached? Not so sure as of the moment, so would be great if I get your input. :) > Generate an immutable Map view for generated messages with a map key > > > Key: KAFKA-8369 > URL: https://issues.apache.org/jira/browse/KAFKA-8369 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > When using the "mapKey" feature, we get an ImplicitLinkedHashCollection which > can be used like a map using the `find()` API. The benefit of this is > hopefully avoiding a conversion to another type when handled by the broker, > but it is a little cumbersome to work with, so we often end up doing the > conversion anyway. One improvement would be to provide a way to convert this > collection to an immutable Map view so that it is easier to work with > directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
[ https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860591#comment-16860591 ] Richard Yu edited comment on KAFKA-8516 at 6/11/19 6:16 AM: Well, this is when we start straying into an area called "consensus algorithms". Kafka's current leader-replica model loosely follows an algorithm referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). If we wish to implement the write permissions part (which looks like a pretty big if), then we would perhaps have to consider something along the lines of EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ). cc [~hachikuji] your thoughts on this? Edit: If implemented correctly, there should be a pretty good performance gain (results in RAFT paper anyways seem to indicate this). was (Author: yohan123): Well, this is when we start straying into an area called "consensus algorithms". Kafka's current leader-replica model closely follows an algorithm referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). If we wish to implement the write permissions part (which looks like a pretty big if), then we would perhaps have to consider something along the lines of EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ). cc [~hachikuji] your thoughts on this? Edit: If implemented correctly, there should be a pretty good performance gain (results in RAFT paper anyways seem to indicate this). > Consider allowing all replicas to have read/write permissions > - > > Key: KAFKA-8516 > URL: https://issues.apache.org/jira/browse/KAFKA-8516 > Project: Kafka > Issue Type: Improvement >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka internals, a leader is responsible for all the read and > write operations requested by the user. This naturally incurs a bottleneck > since one replica, as the leader, would experience a significantly heavier > workload than other replicas and also means that all client commands must > pass through a chokepoint. If a leader fails, all processing effectively > comes to a halt until another leader election. In order to help solve this > problem, we could think about redesigning Kafka core so that any replica is > able to do read and write operations as well. That is, the system be changed > so that _all_ replicas have read/write permissions. > > This has multiple positives. Notably the following: > * Workload can be more evenly distributed since leader replicas are weighted > more than follower replicas (in this new design, all replicas are equal) > * Some failures would not be as catastrophic as in the leader-follower > paradigm. There is no one single "leader". If one replica goes down, others > are still able to read/write as needed. Processing could continue without > interruption. > The implementation for such a change like this will be very extensive and > discussion would be needed to decide if such an improvement as described > above would warrant such a drastic redesign of Kafka internals. > Relevant KIP for read permissions can be found here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
[ https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860591#comment-16860591 ] Richard Yu edited comment on KAFKA-8516 at 6/11/19 6:15 AM: Well, this is when we start straying into an area called "consensus algorithms". Kafka's current leader-replica model closely follows an algorithm referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). If we wish to implement the write permissions part (which looks like a pretty big if), then we would perhaps have to consider something along the lines of EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ). cc [~hachikuji] your thoughts on this? Edit: If implemented correctly, there should be a pretty good performance gain (results in RAFT paper anyways seem to indicate this). was (Author: yohan123): Well, this is when we start straying into an area called "consensus algorithms". Kafka's current leader-replica model closely follows an algorithm referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). If we wish to implement the write permissions part (which looks like a pretty big if), then we would perhaps have to consider something along the lines of EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ). cc [~hachikuji] your thoughts on this? > Consider allowing all replicas to have read/write permissions > - > > Key: KAFKA-8516 > URL: https://issues.apache.org/jira/browse/KAFKA-8516 > Project: Kafka > Issue Type: Improvement >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka internals, a leader is responsible for all the read and > write operations requested by the user. This naturally incurs a bottleneck > since one replica, as the leader, would experience a significantly heavier > workload than other replicas and also means that all client commands must > pass through a chokepoint. If a leader fails, all processing effectively > comes to a halt until another leader election. In order to help solve this > problem, we could think about redesigning Kafka core so that any replica is > able to do read and write operations as well. That is, the system be changed > so that _all_ replicas have read/write permissions. > > This has multiple positives. Notably the following: > * Workload can be more evenly distributed since leader replicas are weighted > more than follower replicas (in this new design, all replicas are equal) > * Some failures would not be as catastrophic as in the leader-follower > paradigm. There is no one single "leader". If one replica goes down, others > are still able to read/write as needed. Processing could continue without > interruption. > The implementation for such a change like this will be very extensive and > discussion would be needed to decide if such an improvement as described > above would warrant such a drastic redesign of Kafka internals. > Relevant KIP for read permissions can be found here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
[ https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860591#comment-16860591 ] Richard Yu commented on KAFKA-8516: --- Well, this is when we start straying into an area called "consensus algorithms". Kafka's current leader-replica model closely follows an algorithm referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). If we wish to implement the write permissions part (which looks like a pretty big if), then we would perhaps have to consider something along the lines of EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ). cc [~hachikuji] your thoughts on this? > Consider allowing all replicas to have read/write permissions > - > > Key: KAFKA-8516 > URL: https://issues.apache.org/jira/browse/KAFKA-8516 > Project: Kafka > Issue Type: Improvement >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka internals, a leader is responsible for all the read and > write operations requested by the user. This naturally incurs a bottleneck > since one replica, as the leader, would experience a significantly heavier > workload than other replicas and also means that all client commands must > pass through a chokepoint. If a leader fails, all processing effectively > comes to a halt until another leader election. In order to help solve this > problem, we could think about redesigning Kafka core so that any replica is > able to do read and write operations as well. That is, the system be changed > so that _all_ replicas have read/write permissions. > > This has multiple positives. Notably the following: > * Workload can be more evenly distributed since leader replicas are weighted > more than follower replicas (in this new design, all replicas are equal) > * Some failures would not be as catastrophic as in the leader-follower > paradigm. There is no one single "leader". If one replica goes down, others > are still able to read/write as needed. Processing could continue without > interruption. > The implementation for such a change like this will be very extensive and > discussion would be needed to decide if such an improvement as described > above would warrant such a drastic redesign of Kafka internals. > Relevant KIP for read permissions can be found here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
[ https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8516: -- Description: Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: * Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all replicas are equal) * Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. Relevant KIP for read permissions can be found here: [KIP-392|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]] was: Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: * Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all replicas are equal) * Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. > Consider allowing all replicas to have read/write permissions > - > > Key: KAFKA-8516 > URL: https://issues.apache.org/jira/browse/KAFKA-8516 > Project: Kafka > Issue Type: Improvement >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka internals, a leader is responsible for all the read and > write operations requested by the user. This naturally incurs a bottleneck > since one replica, as the leader, would experience a significantly heavier > workload than other replicas and also means that all client commands must > pass through a chokepoint. If a leader fails, all processing effectively > comes to a halt until another leader election. In order to help solve this > problem, we could think about redesigning Kafka core so that any replica is > able to do read and write operations as well. That is, the system be changed > so that _all_ replicas have read/write permissions. > > This has multiple positives. Notably the following: > * Workload can be more evenly distributed since leader replicas are weighted > more than follower replicas (in this new design, all replicas are equal) > * Some failures would not be as catastrophic as in the leader-follower > paradigm. There is no one single "leader". If one replica goes down, others > are still able to read/write as needed. Processing could continue without > interruption. > The implementation for such a change like this will be very extensive and > discussion would be needed to decide if such an improvement as described > above would warrant such a drastic redesign of Kafka internals. > Relevant KIP for read permissions can be found here: > [KIP-392|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
[ https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8516: -- Description: Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: * Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all replicas are equal) * Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. Relevant KIP for read permissions can be found here: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica] was: Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: * Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all replicas are equal) * Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. Relevant KIP for read permissions can be found here: [KIP-392|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]] > Consider allowing all replicas to have read/write permissions > - > > Key: KAFKA-8516 > URL: https://issues.apache.org/jira/browse/KAFKA-8516 > Project: Kafka > Issue Type: Improvement >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka internals, a leader is responsible for all the read and > write operations requested by the user. This naturally incurs a bottleneck > since one replica, as the leader, would experience a significantly heavier > workload than other replicas and also means that all client commands must > pass through a chokepoint. If a leader fails, all processing effectively > comes to a halt until another leader election. In order to help solve this > problem, we could think about redesigning Kafka core so that any replica is > able to do read and write operations as well. That is, the system be changed > so that _all_ replicas have read/write permissions. > > This has multiple positives. Notably the following: > * Workload can be more evenly distributed since leader replicas are weighted > more than follower replicas (in this new design, all replicas are equal) > * Some failures would not be as catastrophic as in the leader-follower > paradigm. There is no one single "leader". If one replica goes down, others > are still able to read/write as needed. Processing could continue without > interruption. > The implementation for such a change like this will be very extensive and > discussion would be needed to decide if such an improvement as described > above would warrant such a drastic redesign of Kafka internals. > Relevant KIP for read permissions can be found here: > [https://cwiki.
[jira] [Commented] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
[ https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860350#comment-16860350 ] Richard Yu commented on KAFKA-8516: --- Thanks for the heads up! Will add the link to issue. They are pretty close, but I think that KIP is only covering the addition of read permissions to replicas, not write permissions (i.e. fetches = reads). I think implementing read permissions for all replicas is considerably easier than write permissions (since we have to guarantee consistency). > Consider allowing all replicas to have read/write permissions > - > > Key: KAFKA-8516 > URL: https://issues.apache.org/jira/browse/KAFKA-8516 > Project: Kafka > Issue Type: Improvement >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka internals, a leader is responsible for all the read and > write operations requested by the user. This naturally incurs a bottleneck > since one replica, as the leader, would experience a significantly heavier > workload than other replicas and also means that all client commands must > pass through a chokepoint. If a leader fails, all processing effectively > comes to a halt until another leader election. In order to help solve this > problem, we could think about redesigning Kafka core so that any replica is > able to do read and write operations as well. That is, the system be changed > so that _all_ replicas have read/write permissions. > > This has multiple positives. Notably the following: > * Workload can be more evenly distributed since leader replicas are weighted > more than follower replicas (in this new design, all replicas are equal) > * Some failures would not be as catastrophic as in the leader-follower > paradigm. There is no one single "leader". If one replica goes down, others > are still able to read/write as needed. Processing could continue without > interruption. > The implementation for such a change like this will be very extensive and > discussion would be needed to decide if such an improvement as described > above would warrant such a drastic redesign of Kafka internals. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
[ https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8516: -- Description: Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: * Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all replicas are equal) * Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. was: Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: * Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all partitions are equal) * Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. > Consider allowing all replicas to have read/write permissions > - > > Key: KAFKA-8516 > URL: https://issues.apache.org/jira/browse/KAFKA-8516 > Project: Kafka > Issue Type: Improvement >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka internals, a leader is responsible for all the read and > write operations requested by the user. This naturally incurs a bottleneck > since one replica, as the leader, would experience a significantly heavier > workload than other replicas and also means that all client commands must > pass through a chokepoint. If a leader fails, all processing effectively > comes to a halt until another leader election. In order to help solve this > problem, we could think about redesigning Kafka core so that any replica is > able to do read and write operations as well. That is, the system be changed > so that _all_ replicas have read/write permissions. > > This has multiple positives. Notably the following: > * Workload can be more evenly distributed since leader replicas are weighted > more than follower replicas (in this new design, all replicas are equal) > * Some failures would not be as catastrophic as in the leader-follower > paradigm. There is no one single "leader". If one replica goes down, others > are still able to read/write as needed. Processing could continue without > interruption. > The implementation for such a change like this will be very extensive and > discussion would be needed to decide if such an improvement as described > above would warrant such a drastic redesign of Kafka internals. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
[ https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8516: -- Description: Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: * Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all partitions are equal) * Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. was: Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: - Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all partitions are equal) - Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. > Consider allowing all replicas to have read/write permissions > - > > Key: KAFKA-8516 > URL: https://issues.apache.org/jira/browse/KAFKA-8516 > Project: Kafka > Issue Type: Improvement >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka internals, a leader is responsible for all the read and > write operations requested by the user. This naturally incurs a bottleneck > since one replica, as the leader, would experience a significantly heavier > workload than other replicas and also means that all client commands must > pass through a chokepoint. If a leader fails, all processing effectively > comes to a halt until another leader election. In order to help solve this > problem, we could think about redesigning Kafka core so that any replica is > able to do read and write operations as well. That is, the system be changed > so that _all_ replicas have read/write permissions. > > This has multiple positives. Notably the following: > * Workload can be more evenly distributed since leader replicas are weighted > more than follower replicas (in this new design, all partitions are equal) > * Some failures would not be as catastrophic as in the leader-follower > paradigm. There is no one single "leader". If one replica goes down, others > are still able to read/write as needed. Processing could continue without > interruption. > The implementation for such a change like this will be very extensive and > discussion would be needed to decide if such an improvement as described > above would warrant such a drastic redesign of Kafka internals. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
Richard Yu created KAFKA-8516: - Summary: Consider allowing all replicas to have read/write permissions Key: KAFKA-8516 URL: https://issues.apache.org/jira/browse/KAFKA-8516 Project: Kafka Issue Type: Improvement Reporter: Richard Yu Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: - Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all partitions are equal) - Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857147#comment-16857147 ] Richard Yu commented on KAFKA-8421: --- I think that we could extend this issue to account for a poll() operation which did not begin during a rebalance, but extends into it. Basically, we want to break the timeout up so that we can "join" the group, so that technically the consumer has received its new assignment, but will not receive it (in the form of the SyncGroup and JoinGroup response) until its fetch request is complete. Edit to previous comment: We could also halt any ongoing fetch requests should rebalance timeout hit its limit and return the data we already have. When every fetch request has been completed, we would send all JoinGroup responses afterwards (in otherwords, while a fetch request for previously owned partitions is still live, we will wait on sending any JoinGroup requests to avoid two consumers requesting for records from the same partition). > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records they may not call the next poll() in time > (think: Kafka Streams num.iterations mechanism), which may leads to consumer > dropping out of the group during rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088 ] Richard Yu edited comment on KAFKA-8421 at 6/5/19 11:13 PM: [~guozhang] There are a couple of simple approaches we could try to use to help prevent the consumer from dropping out of the group during rebalance: # We could artificially extend the rebalance timeout to accommodate the time spent in poll but considering that goes against the design described in KIP-62 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]), I don't think this is much of an option. # What we otherwise could attempt is an early termination of the record fetch request. In this scenario, we would cancel the request on broker side, but by the time the user could call poll() again, it is possible that the rebalance timeout has expired. What we could do instead is the the following. The poll() operation which begins during rebalance would not block and wait the entire allocated time by the user, but instead, have this user timeout split into smaller chunks of time, in between which, the consumer could check for a possible rejoin. (i.e. we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is needed, we would send a JoinGroupRequest. On broker side, we could modify the logic so that any pending fetch requests would be respected and be continued to completion. The JoinGroup response will be held off for a particular consumer until its fetch request has completed. In which case, we would then send the JoinGroupResponse (although it has been delayed). I thought that this is a good way of avoiding breaking guarantees that Kafka had in place (particularly in regards to rebalance timeout). WDYT of this approach? was (Author: yohan123): [~guozhang] There are a couple of simple approaches we could try to use to help prevent the consumer from dropping out of the group during rebalance: # We could artificially extend the rebalance timeout to accommodate the time spent in poll but considering that goes against the design described in KIP-62 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]), I don't think this is much of an option. # What we otherwise could do instead is attempt an early termination of the record fetch request. In this scenario, we would cancel the request on broker side, but by the time the user could call poll() again, it is possible that the rebalance timeout has expired. What we could do instead is the the following. The poll() operation which begins during rebalance would not block and wait the entire allocated time by the user, but instead, have this user timeout split into smaller chunks of time, in between which, the consumer could check for a possible rejoin. (i.e. we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is needed, we would send a JoinGroupRequest. On broker side, we could modify the logic so that any pending fetch requests would be respected and be continued to completion. The JoinGroup response will be held off for a particular consumer until its fetch request has completed. In which case, we would then send the JoinGroupResponse (although it has been delayed). I thought that this is a good way of avoiding breaking guarantees that Kafka had in place (particularly in regards to rebalance timeout). WDYT of this approach? > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records
[jira] [Comment Edited] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088 ] Richard Yu edited comment on KAFKA-8421 at 6/5/19 9:32 PM: --- [~guozhang] There are a couple of simple approaches we could try to use to help prevent the consumer from dropping out of the group during rebalance: # We could artificially extend the rebalance timeout to accommodate the time spent in poll but considering that goes against the design described in [KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]], I don't think this is much of an option. # What we otherwise could do instead is attempt an early termination of the record fetch request. In this scenario, we would cancel the request on broker side, but by the time the user could call poll() again, it is possible that the rebalance timeout has expired. What we could do instead is the the following. The poll() operation which begins during rebalance would not block and wait the entire allocated time by the user, but instead, have this user timeout split into smaller chunks of time, in between which, the consumer could check for a possible rejoin. (i.e. we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is needed, we would send a JoinGroupRequest. On broker side, we could modify the logic so that any pending fetch requests would be respected and be continued to completion. The JoinGroup response will be held off for a particular consumer until its fetch request has completed. In which case, we would then send the JoinGroupResponse (although it has been delayed). I thought that this is a good way of avoiding breaking guarantees that Kafka had in place (particularly in regards to rebalance timeout). WDYT of this approach? was (Author: yohan123): [~guozhang] There are a couple of simple approaches we could try to use to help prevent the consumer from dropping out of the group during rebalance: # We could artificially extend the rebalance timeout to accommodate the time spent in poll but considering that goes against the design described in [KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]], I don't think this is much of an option. # What we otherwise could do instead is attempt an early termination of the record fetch request. In this scenario, we would cancel the request on broker side, but by the time the user could call poll() again, it is possible that the rebalance timeout has expired. What we could do instead is the the following. The poll() operation which begins during rebalance would not block and wait the entire allocated time by the user, but instead, have this user timeout split into smaller chunks of time, in between which, the consumer could check for a possible rejoin. (i.e. we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is needed, we would send a JoinGroupRequest. On broker side, we could modify the logic so that any pending fetch requests would be respected and be continued to completion. The JoinGroup response will be held off for a particular consumer until its fetch request has completed. In which case, we would then send the JoinGroupResponse (although it has been delayed). I thought that this is a good way of avoiding breaking guarantees that Kafka had in place (particularly in regards to rebalance timeout). WDYT of this approach? > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing thos
[jira] [Comment Edited] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088 ] Richard Yu edited comment on KAFKA-8421 at 6/5/19 9:32 PM: --- [~guozhang] There are a couple of simple approaches we could try to use to help prevent the consumer from dropping out of the group during rebalance: # We could artificially extend the rebalance timeout to accommodate the time spent in poll but considering that goes against the design described in KIP-62 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]), I don't think this is much of an option. # What we otherwise could do instead is attempt an early termination of the record fetch request. In this scenario, we would cancel the request on broker side, but by the time the user could call poll() again, it is possible that the rebalance timeout has expired. What we could do instead is the the following. The poll() operation which begins during rebalance would not block and wait the entire allocated time by the user, but instead, have this user timeout split into smaller chunks of time, in between which, the consumer could check for a possible rejoin. (i.e. we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is needed, we would send a JoinGroupRequest. On broker side, we could modify the logic so that any pending fetch requests would be respected and be continued to completion. The JoinGroup response will be held off for a particular consumer until its fetch request has completed. In which case, we would then send the JoinGroupResponse (although it has been delayed). I thought that this is a good way of avoiding breaking guarantees that Kafka had in place (particularly in regards to rebalance timeout). WDYT of this approach? was (Author: yohan123): [~guozhang] There are a couple of simple approaches we could try to use to help prevent the consumer from dropping out of the group during rebalance: # We could artificially extend the rebalance timeout to accommodate the time spent in poll but considering that goes against the design described in [KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]], I don't think this is much of an option. # What we otherwise could do instead is attempt an early termination of the record fetch request. In this scenario, we would cancel the request on broker side, but by the time the user could call poll() again, it is possible that the rebalance timeout has expired. What we could do instead is the the following. The poll() operation which begins during rebalance would not block and wait the entire allocated time by the user, but instead, have this user timeout split into smaller chunks of time, in between which, the consumer could check for a possible rejoin. (i.e. we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is needed, we would send a JoinGroupRequest. On broker side, we could modify the logic so that any pending fetch requests would be respected and be continued to completion. The JoinGroup response will be held off for a particular consumer until its fetch request has completed. In which case, we would then send the JoinGroupResponse (although it has been delayed). I thought that this is a good way of avoiding breaking guarantees that Kafka had in place (particularly in regards to rebalance timeout). WDYT of this approach? > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing thos
[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088 ] Richard Yu commented on KAFKA-8421: --- [~guozhang] There are a couple of simple approaches we could try to use to help prevent the consumer from dropping out of the group during rebalance: # We could artificially extend the rebalance timeout to accommodate the time spent in poll but considering that goes against the design described in [KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]], I don't think this is much of an option. # What we otherwise could do instead is attempt an early termination of the record fetch request. In this scenario, we would cancel the request on broker side, but by the time the user could call poll() again, it is possible that the rebalance timeout has expired. What we could do instead is the the following. The poll() operation which begins during rebalance would not block and wait the entire allocated time by the user, but instead, have this user timeout split into smaller chunks of time, in between which, the consumer could check for a possible rejoin. (i.e. we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is needed, we would send a JoinGroupRequest. On broker side, we could modify the logic so that any pending fetch requests would be respected and be continued to completion. The JoinGroup response will be held off for a particular consumer until its fetch request has completed. In which case, we would then send the JoinGroupResponse (although it has been delayed). I thought that this is a good way of avoiding breaking guarantees that Kafka had in place (particularly in regards to rebalance timeout). WDYT of this approach? > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records they may not call the next poll() in time > (think: Kafka Streams num.iterations mechanism), which may leads to consumer > dropping out of the group during rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure
[ https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856999#comment-16856999 ] Richard Yu commented on KAFKA-8438: --- Sure [~bchen225242] will look into it and see what else I could think of. > Add API to allow user to define end behavior of consumer failure > > > Key: KAFKA-8438 > URL: https://issues.apache.org/jira/browse/KAFKA-8438 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Priority: Major > Labels: needs-dicussion, needs-kip > > Recently, in a concerted effort to make Kafka's rebalances less painful, > various approaches has been used to reduce the number of and impact of > rebalances. Often, the trigger of a rebalance is a failure of some sort or a > thrown exception during processing, in which case, the workload will be > redistributed among surviving threads. Working to reduce rebalances due to > random consumer crashes, a recent change to Kafka internals had been made > (which introduces the concept of static membership) that prevents a rebalance > from occurring within {{session.timeout.ms}} in the hope that the consumer > thread which crashed would recover in that time interval and rejoin the group. > However, in some cases, some consumer threads would permanently go down or > remain dead for long periods of time. In these scenarios, users of Kafka > would possibly not be aware of such a crash until hours later after it > happened which forces Kafka users to manually start a new KafkaConsumer > process a considerable period of time after the failure had occurred. That is > where the addition of a callback such as {{onConsumerFailure}} would help. > There are multiple use cases for this callback (which is defined by the > user). {{onConsumerFailure}} is called when a particular consumer thread goes > under for some specified time interval (i.e. a config called > {{acceptable.consumer.failure.timeout.ms}}). When called, this method could > be used to log a consumer failure or should the user wish it, create a new > thread which would then rejoin the consumer group (which could also include > the required {{group.instance.id}} so that a rebalance wouldn't be > re-triggered –- we would need to think about that). > Should the old thread recover and attempt to rejoin the consumer group (with > the substitute thread being part of the group), the old thread will be denied > access and an exception would be thrown (to indicate that another process has > already taken its place). > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853099#comment-16853099 ] Richard Yu commented on KAFKA-8421: --- A key issue here is when we stop sending the old records of a consumer's assignment to the user. What we could consider doing is that, whenever we are in rebalance, we check the MemberState if its rebalancing first before sending. If the MemberState is no longer rebalancing, then we just abort any records we mean to send. onPartitionsRevoked/onPartitionsLost could also be be a viable way to abort an ongoing poll operation for old records. > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records they may not call the next poll() in time > (think: Kafka Streams num.iterations mechanism), which may leads to consumer > dropping out of the group during rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852643#comment-16852643 ] Richard Yu commented on KAFKA-8421: --- Might try to take a hack at this one. It certainly is an interesting issue. :) > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records they may not call the next poll() in time > (think: Kafka Streams num.iterations mechanism), which may leads to consumer > dropping out of the group during rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852428#comment-16852428 ] Richard Yu commented on KAFKA-3539: --- [~radai] I don't know if this problem is resolved or not. Issue 6705 was closed because it was thought any changes made would be too complex to fix this behavioral issue. > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Oleg Zhurakousky >Priority: Critical > Labels: needs-discussion, needs-kip > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)