[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2020-11-12 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231123#comment-17231123
 ] 

Richard Yu commented on KAFKA-10575:


Thanks for letting me know! I was already looking at StoreChangelogReader since 
that was probably one of only two places where onRestoreEnd was called. 
Hopefully, I will be able to pull together a PR that can tackle this issue.

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2020-11-09 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228890#comment-17228890
 ] 

Richard Yu commented on KAFKA-10575:


[~guozhang] I'm interested in picking this one up. May I try my hand at it?

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-06-16 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136786#comment-17136786
 ] 

Richard Yu edited comment on KAFKA-8770 at 6/16/20, 4:25 PM:
-

[~gyammine] Thanks for the comment! I apologize for the hiatus when tackling 
this issue. I am planning on getting more of this KIP implemented.

 [~vvcephei] I will ping you once the PR is in a good state. :)


was (Author: yohan123):
[~gyammine] Thanks for the comment! I apologize for the hiatus when tackling 
this issue. I am planning on tackling this next. Hope we can get it into the 
next release.

[~vvcephei] Will ping you once the PR for FK-join is ready for review. :)

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-06-16 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136786#comment-17136786
 ] 

Richard Yu commented on KAFKA-8770:
---

[~gyammine] Thanks for the comment! I apologize for the hiatus when tackling 
this issue. I am planning on tackling this next. Hope we can get it into the 
next release.

[~vvcephei] Will ping you once the PR for FK-join is ready for review. :)

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9808) Refactor State Store Hierarchy

2020-04-03 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074752#comment-17074752
 ] 

Richard Yu commented on KAFKA-9808:
---

My assumption is that to make this refactoring more doable, we probably can 
break the issue down, and tackle the basic store types, one at a time:
 # KeyValueStore
 # SessionStore
 # WindowStore
 # SegmentedBytesStore

Not all of them I suppose would require extensive cleanup, but its good to keep 
in mind,.

> Refactor State Store Hierarchy
> --
>
> Key: KAFKA-9808
> URL: https://issues.apache.org/jira/browse/KAFKA-9808
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Richard Yu
>Priority: Major
> Attachments: Current State Store Hierarchy.pdf
>
>
> Over years of development, Kafka contributors has been adding more and more 
> state store classes on top of each other without too much regard to making it 
> more approachable for future modifications. For instance, it has become 
> increasingly difficult to add new API to state store classes while at the 
> same time, preventing them from being exposed to users. 
> In sum, the entire hierarchy is slowly spiraling out of control, and there is 
> a growing need to consolidate the multiple state store types into a few more 
> manageable ones for future Kafka developers. 
> Note: There has already been a couple of attempts to simplify the state store 
> hierarchy, but while the task isn't too complex, its just the enormous scope 
> of the change which makes things difficult.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9808) Refactor State Store Hierarchy

2020-04-03 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9808:
--
Attachment: Current State Store Hierarchy.pdf

> Refactor State Store Hierarchy
> --
>
> Key: KAFKA-9808
> URL: https://issues.apache.org/jira/browse/KAFKA-9808
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Richard Yu
>Priority: Major
> Attachments: Current State Store Hierarchy.pdf
>
>
> Over years of development, Kafka contributors has been adding more and more 
> state store classes on top of each other without too much regard to making it 
> more approachable for future modifications. For instance, it has become 
> increasingly difficult to add new API to state store classes while at the 
> same time, preventing them from being exposed to users. 
> In sum, the entire hierarchy is slowly spiraling out of control, and there is 
> a growing need to consolidate the multiple state store types into a few more 
> manageable ones for future Kafka developers. 
> Note: There has already been a couple of attempts to simplify the state store 
> hierarchy, but while the task isn't too complex, its just the enormous scope 
> of the change which makes things difficult.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9808) Refactor State Store Hierarchy

2020-04-02 Thread Richard Yu (Jira)
Richard Yu created KAFKA-9808:
-

 Summary: Refactor State Store Hierarchy
 Key: KAFKA-9808
 URL: https://issues.apache.org/jira/browse/KAFKA-9808
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Richard Yu


Over years of development, Kafka contributors has been adding more and more 
state store classes on top of each other without too much regard to making it 
more approachable for future modifications. For instance, it has become 
increasingly difficult to add new API to state store classes while at the same 
time, preventing them from being exposed to users. 

In sum, the entire hierarchy is slowly spiraling out of control, and there is a 
growing need to consolidate the multiple state store types into a few more 
manageable ones for future Kafka developers. 

Note: There has already been a couple of attempts to simplify the state store 
hierarchy, but while the task isn't too complex, its just the enormous scope of 
the change which makes things difficult.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9347) Detect deleted log directory before becoming leader

2020-03-29 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17070549#comment-17070549
 ] 

Richard Yu commented on KAFKA-9347:
---

[~hachikuji] Can I pick this one up?

> Detect deleted log directory before becoming leader
> ---
>
> Key: KAFKA-9347
> URL: https://issues.apache.org/jira/browse/KAFKA-9347
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-discussion
>
> There is no protection currently if a broker has had its log directory 
> deleted to prevent it from becoming the leader of a partition that it still 
> remains in the ISR of. This scenario can happen when the last remaining 
> replica in the ISR is shutdown. It will remain in the ISR and be eligible for 
> leadership as soon as it starts up. It would be useful to either detect this 
> case situation dynamically in order to force the user to do an unclean 
> election or recover another broker. One option might be just to pass a flag 
> on startup to specify that a broker should not be eligible for leadership. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9733) Consider addition of leader quorum in replication model

2020-03-22 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064328#comment-17064328
 ] 

Richard Yu commented on KAFKA-9733:
---

[~bchen225242] Do you know how much use this has for Kafka? The design and 
implementation for this issue would no doubt be horrendously complex, so whats 
your take? 

> Consider addition of leader quorum in replication model
> ---
>
> Key: KAFKA-9733
> URL: https://issues.apache.org/jira/browse/KAFKA-9733
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Richard Yu
>Priority: Minor
>
> Kafka's current replication model (with its single leader and several 
> followers) is somewhat similar to the current consensus algorithms being used 
> in databases (RAFT) with the major difference being the existence of the ISR. 
> Consequently, Kafka suffers from the same fault tolerance issues as does 
> other distributed systems which rely on RAFT: the leader tends to be the 
> chokepoint for failures i.e. if it goes down, it will have a brief 
> stop-the-world effect. 
> In contrast, giving all replicas the power to write and read to other 
> replicas is also difficult to accomplish (as emphasized by the complexity of 
> the Egalitarian Paxos algorithm), since consistency is so hard to maintain in 
> such an algorithm, plus very little gain compared to the overhead. 
> Therefore, I propose that we have an intermediate plan in between these two 
> algorithms, and that is the leader replica quorum. In essence, there will be 
> multiple leaders (which have the power for both read and writes), but the 
> number of leaders will not be excessive (i.e. maybe three at max). How we 
> achieve consistency is simple:
>  * Any leader has the power to propose a write update to other replicas. But 
> before passing a write update to a follower, the other leaders must elect if 
> such an operation is granted.
>  * In principle, a leader will propose a write update to the other leaders, 
> and once the other leaders have integrated that write update into their 
> version of the stored data, they will also give the green light. 
>  * If say, more than half the other leaders have agreed that the current 
> change is good to go, then we can forward the change downstream to the other 
> replicas.
>  The algorithm for maintaining consistency between multiple leaders will 
> still have to be worked out in detail. However, there would be multiple gains 
> from this design over the old model:
>  # The single leader failure bottleneck has been alleviated to a certain 
> extent, since there are now multiple leader replicas.
>  # Write updates will potentially no longer be bottlenecked at one single 
> leader (since there are multiple leaders available). On a related note, there 
> has been a KIP that allows clients to read from non-leader replicas. (will 
> add the KIP link soon).
> Some might note that the overhead from maintaining consistency among multiple 
> leaders might offset these gains. That might be true, with a large number of 
> leaders, but with a small number then (capped at 3 as mentioned above), the 
> overhead will also be correspondingly small. (How latency will be affected is 
> unknown until further testing, but more than likely, this option will 
> probably be. configurable depending on user requirements).
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model

2020-03-19 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three at max). How we 
achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will potentially no longer be bottlenecked at one single 
leader (since there are multiple leaders available). On a related note, there 
has been a KIP that allows clients to read from non-leader replicas. (will add 
the KIP link soon).

Some might note that the overhead from maintaining consistency among multiple 
leaders might offset these gains. That might be true, with a large number of 
leaders, but with a small number then (capped at 3 as mentioned above), the 
overhead will also be correspondingly small. (How latency will be affected is 
unknown until further testing, but more than likely, this option will probably 
be. configurable depending on user requirements).

 

 

  was:
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or five at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will potentially no longer be bottlenecked at one single 
leader (since there are mul

[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or five at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will no longer be bottlenecked at one single leader (since 
there are multiple leaders available). On a related note, there has been a KIP 
that allows clients to read from non-leader replicas. (will add the KIP link 
soon).

Some might note that the overhead from maintaining consistency among multiple 
leaders might offset these gains. That might be true, with a large number of 
leaders, but with a small number then (capped at 3 or 4 as mentioned above), 
the overhead will also be correspondingly small. (How latency will be affected 
is unknown until further testing, but more than likely, this option will 
probably be. configurable depending on user requirements).

 

 

  was:
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or four at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will no longer be bottlenecked at one single leader (since 
there are multiple leade

[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or five at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will potentially no longer be bottlenecked at one single 
leader (since there are multiple leaders available). On a related note, there 
has been a KIP that allows clients to read from non-leader replicas. (will add 
the KIP link soon).

Some might note that the overhead from maintaining consistency among multiple 
leaders might offset these gains. That might be true, with a large number of 
leaders, but with a small number then (capped at 3 or 5 as mentioned above), 
the overhead will also be correspondingly small. (How latency will be affected 
is unknown until further testing, but more than likely, this option will 
probably be. configurable depending on user requirements).

 

 

  was:
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or five at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will no longer be bottlenecked at one single leader (since 
there are mu

[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or four at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will no longer be bottlenecked at one single leader (since 
there are multiple leaders available). On a related note, there has been a KIP 
that allows clients to read from non-leader replicas. (will add the KIP link 
soon).

Some might note that the overhead from maintaining consistency among multiple 
leaders might offset these gains. That might be true, with a large number of 
leaders, but with a small number then (capped at 3 or 4 as mentioned above), 
the overhead will also be correspondingly small. (How latency will be affected 
is unknown until further testing, but more than likely, this option will 
probably be. configurable depending on user requirements).

 

 

  was:
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or four at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will no longer be bottlenecked at one single leader (since 
there are multiple leade

[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate plan in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or four at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can forward the change downstream to the other replicas.

 The algorithm for maintaining consistency between multiple leaders will still 
have to be worked out in detail. However, there would be multiple gains from 
this design over the old model:
 # The single leader failure bottleneck has been alleviated to a certain 
extent, since there are now multiple leader replicas.
 # Write updates will no longer be bottlenecked at one single leader (since 
there are multiple leaders available). On a related note, there has been a KIP 
that allows clients to read from non-leader replicas. (will add the KIP link 
soon).

Some might note that the overhead from maintaining consistency among multiple 
leaders might offset these gains. That might be true, with a large number of 
leaders, but with a small number them (capped at 3 or 4 as mentioned above), 
the overhead will also be correspondingly small. (How latency will be affected 
is unknown until further testing, but more than likely, this option will 
probably be. configurable depending on user requirements).

 

 

  was:
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate step in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or four at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can 

 

 

 


> Consider addition of leader quorum in replication model
> ---
>
> Key: KAFKA-9733
> URL: https://issues.apache.org/jira/browse/KAFKA-9733
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Richard Yu
>Priority: Minor
>
> Kafka's current replication model (with its single leader and several

[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate step in between these two 
algorithms, and that is the leader replica quorum. In essence, there will be 
multiple leaders (which have the power for both read and writes), but the 
number of leaders will not be excessive (i.e. maybe three or four at max). How 
we achieve consistency is simple:
 * Any leader has the power to propose a write update to other replicas. But 
before passing a write update to a follower, the other leaders must elect if 
such an operation is granted.
 * In principle, a leader will propose a write update to the other leaders, and 
once the other leaders have integrated that write update into their version of 
the stored data, they will also give the green light. 
 * If say, more than half the other leaders have agreed that the current change 
is good to go, then we can 

 

 

 

  was:
Note: Description still not finished. Still not sure if this is needed.

Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate step in between these two 
algorithms, and that is the leader partition quorum.

 


> Consider addition of leader quorum in replication model
> ---
>
> Key: KAFKA-9733
> URL: https://issues.apache.org/jira/browse/KAFKA-9733
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Richard Yu
>Priority: Minor
>
> Kafka's current replication model (with its single leader and several 
> followers) is somewhat similar to the current consensus algorithms being used 
> in databases (RAFT) with the major difference being the existence of the ISR. 
> Consequently, Kafka suffers from the same fault tolerance issues as does 
> other distributed systems which rely on RAFT: the leader tends to be the 
> chokepoint for failures i.e. if it goes down, it will have a brief 
> stop-the-world effect. 
> In contrast, giving all replicas the power to write and read to other 
> replicas is also difficult to accomplish (as emphasized by the complexity of 
> the Egalitarian Paxos algorithm), since consistency is so hard to maintain in 
> such an algorithm, plus very little gain compared to the overhead. 
> Therefore, I propose that we have an intermediate step in between these two 
> algorithms, and that is the leader replica quorum. In essence, there will be 
> multiple leaders (which have the power for both read and writes), but the 
> number of leaders will not be excessive (i.e. maybe three or four at max). 
> How we achieve consistency is simple:
>  * Any leader has the power to propose a write update to other replicas. But 
> before passing a write update to a follower, the other leaders must elect if 
> such an operation is granted.
>  * In principle, a leader will propose a write update to the other leaders, 
> and once the other leaders have integrated that write update into their 
> version of the stored data, they will also give the green light. 
>  * If say, more than half the other leaders have agreed that the current 
> change is good to go, then we can 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9733) Consider addition of leader quorum in replication model

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Summary: Consider addition of leader quorum in replication model  (was: 
Consider addition of leader partition quorum)

> Consider addition of leader quorum in replication model
> ---
>
> Key: KAFKA-9733
> URL: https://issues.apache.org/jira/browse/KAFKA-9733
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Richard Yu
>Priority: Minor
>
> Note: Description still not finished. Still not sure if this is needed.
> Kafka's current replication model (with its single leader and several 
> followers) is somewhat similar to the current consensus algorithms being used 
> in databases (RAFT) with the major difference being the existence of the ISR. 
> Consequently, Kafka suffers from the same fault tolerance issues as does 
> other distributed systems which rely on RAFT: the leader tends to be the 
> chokepoint for failures i.e. if it goes down, it will have a brief 
> stop-the-world effect. 
> In contrast, giving all replicas the power to write and read to other 
> replicas is also difficult to accomplish (as emphasized by the complexity of 
> the Egalitarian Paxos algorithm), since consistency is so hard to maintain in 
> such an algorithm, plus very little gain compared to the overhead. 
> Therefore, I propose that we have an intermediate step in between these two 
> algorithms, and that is the leader partition quorum.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9733) Consider addition of leader partition quorum

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Note: Description still not finished. Still not sure if this is needed.

Kafka's current replication model (with its single leader and several 
followers) is somewhat similar to the current consensus algorithms being used 
in databases (RAFT) with the major difference being the existence of the ISR. 
Consequently, Kafka suffers from the same fault tolerance issues as does other 
distributed systems which rely on RAFT: the leader tends to be the chokepoint 
for failures i.e. if it goes down, it will have a brief stop-the-world effect. 

In contrast, giving all replicas the power to write and read to other replicas 
is also difficult to accomplish (as emphasized by the complexity of the 
Egalitarian Paxos algorithm), since consistency is so hard to maintain in such 
an algorithm, plus very little gain compared to the overhead. 

Therefore, I propose that we have an intermediate step in between these two 
algorithms, and that is the leader partition quorum.

 

  was:
Note: Description still not finished. Still not sure if this is needed.

This feature I'm proposing might not offer too much of a performance boost, but 
I think it is still worth considering. In our current replication model, we 
have a single leader and several followers (with our ISR included). However, 
the current bottleneck would be that once the leader goes down, it will take a 
while to get the next leader online, which is a serious pain. (also leading to 
a considerable write/read delay)

In order to help alleviate this issue, we can consider multiple clusters 
independent of each other i.e. each of them are their own leader/follower group 
for the _same partition set_. The difference here is that these clusters can 
_communicate_ between one another. 

At first, this might seem redundant, but there is a reasoning to this:
 # Let's say we have two leader/follower groups (I must note that these two 
groups does _not_ have shared memory) for the same replicated partition.
 # One leader goes down, and that means for the respective followers, they 
would under normal circumstances be unable to receive new write updates.
 # However, in this situation, we can have those followers poll their 
write/read requests from the other group whose leader has _not gone down._ It 
doesn't necessarily have to be  the leader either, it can be other members from 
that group's ISR. 
 # The idea here is that if the members of these two groups detect that they 
are lagging behind another, they would be able to poll one another for updates.

So what is the difference here from just having multiple leaders in a single 
cluster?

The answer is that the leader is responsible for making sure that there is 
consistency within _its own cluster._ Not the other cluster it is in 
communication with.  


> Consider addition of leader partition quorum
> 
>
> Key: KAFKA-9733
> URL: https://issues.apache.org/jira/browse/KAFKA-9733
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Richard Yu
>Priority: Minor
>
> Note: Description still not finished. Still not sure if this is needed.
> Kafka's current replication model (with its single leader and several 
> followers) is somewhat similar to the current consensus algorithms being used 
> in databases (RAFT) with the major difference being the existence of the ISR. 
> Consequently, Kafka suffers from the same fault tolerance issues as does 
> other distributed systems which rely on RAFT: the leader tends to be the 
> chokepoint for failures i.e. if it goes down, it will have a brief 
> stop-the-world effect. 
> In contrast, giving all replicas the power to write and read to other 
> replicas is also difficult to accomplish (as emphasized by the complexity of 
> the Egalitarian Paxos algorithm), since consistency is so hard to maintain in 
> such an algorithm, plus very little gain compared to the overhead. 
> Therefore, I propose that we have an intermediate step in between these two 
> algorithms, and that is the leader partition quorum.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9733) Consider addition of leader partition quorum

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Summary: Consider addition of leader partition quorum  (was: Consider 
addition to Kafka's replication model)

> Consider addition of leader partition quorum
> 
>
> Key: KAFKA-9733
> URL: https://issues.apache.org/jira/browse/KAFKA-9733
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Richard Yu
>Priority: Minor
>
> Note: Description still not finished. Still not sure if this is needed.
> This feature I'm proposing might not offer too much of a performance boost, 
> but I think it is still worth considering. In our current replication model, 
> we have a single leader and several followers (with our ISR included). 
> However, the current bottleneck would be that once the leader goes down, it 
> will take a while to get the next leader online, which is a serious pain. 
> (also leading to a considerable write/read delay)
> In order to help alleviate this issue, we can consider multiple clusters 
> independent of each other i.e. each of them are their own leader/follower 
> group for the _same partition set_. The difference here is that these 
> clusters can _communicate_ between one another. 
> At first, this might seem redundant, but there is a reasoning to this:
>  # Let's say we have two leader/follower groups (I must note that these two 
> groups does _not_ have shared memory) for the same replicated partition.
>  # One leader goes down, and that means for the respective followers, they 
> would under normal circumstances be unable to receive new write updates.
>  # However, in this situation, we can have those followers poll their 
> write/read requests from the other group whose leader has _not gone down._ It 
> doesn't necessarily have to be  the leader either, it can be other members 
> from that group's ISR. 
>  # The idea here is that if the members of these two groups detect that they 
> are lagging behind another, they would be able to poll one another for 
> updates.
> So what is the difference here from just having multiple leaders in a single 
> cluster?
> The answer is that the leader is responsible for making sure that there is 
> consistency within _its own cluster._ Not the other cluster it is in 
> communication with.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9733) Consider addition to Kafka's replication model

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Note: Description still not finished. Still not sure if this is needed.

This feature I'm proposing might not offer too much of a performance boost, but 
I think it is still worth considering. In our current replication model, we 
have a single leader and several followers (with our ISR included). However, 
the current bottleneck would be that once the leader goes down, it will take a 
while to get the next leader online, which is a serious pain. (also leading to 
a considerable write/read delay)

In order to help alleviate this issue, we can consider multiple clusters 
independent of each other i.e. each of them are their own leader/follower group 
for the _same partition set_. The difference here is that these clusters can 
_communicate_ between one another. 

At first, this might seem redundant, but there is a reasoning to this:
 # Let's say we have two leader/follower groups (I must note that these two 
groups does _not_ have shared memory) for the same replicated partition.
 # One leader goes down, and that means for the respective followers, they 
would under normal circumstances be unable to receive new write updates.
 # However, in this situation, we can have those followers poll their 
write/read requests from the other group whose leader has _not gone down._ It 
doesn't necessarily have to be  the leader either, it can be other members from 
that group's ISR. 
 # The idea here is that if the members of these two groups detect that they 
are lagging behind another, they would be able to poll one another for updates.

So what is the difference here from just having multiple leaders in a single 
cluster?

The answer is that the leader is responsible for making sure that there is 
consistency within _its own cluster._ Not the other cluster it is in 
communication with.  

  was:
Note: Description still not finished.

This feature I'm proposing might not offer too much of a performance boost, but 
I think it is still worth considering. In our current replication model, we 
have a single leader and several followers (with our ISR included). However, 
the current bottleneck would be that once the leader goes down, it will take a 
while to get the next leader online, which is a serious pain. (also leading to 
a considerable write/read delay)

In order to help alleviate this issue, we can consider multiple clusters 
independent of each other i.e. each of them are their own leader/follower group 
for the _same partition set_. The difference here is that these clusters can 
_communicate_ between one another. 

At first, this might seem redundant, but there is a reasoning to this:
 # Let's say we have two leader/follower groups (I must note that these two 
groups does _not_ have shared memory) for the same replicated partition.
 # One leader goes down, and that means for the respective followers, they 
would under normal circumstances be unable to receive new write updates.
 # However, in this situation, we can have those followers poll their 
write/read requests from the other group whose leader has _not gone down._ It 
doesn't necessarily have to be  the leader either, it can be other members from 
that group's ISR. 
 # The idea here is that if the members of these two groups detect that they 
are lagging behind another, they would be able to poll one another for updates.

So what is the difference here from just having multiple leaders in a single 
cluster?

The answer is that the leader is responsible for making sure that there is 
consistency within _its own cluster._ Not the other cluster it is in 
communication with.  


> Consider addition to Kafka's replication model
> --
>
> Key: KAFKA-9733
> URL: https://issues.apache.org/jira/browse/KAFKA-9733
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Richard Yu
>Priority: Minor
>
> Note: Description still not finished. Still not sure if this is needed.
> This feature I'm proposing might not offer too much of a performance boost, 
> but I think it is still worth considering. In our current replication model, 
> we have a single leader and several followers (with our ISR included). 
> However, the current bottleneck would be that once the leader goes down, it 
> will take a while to get the next leader online, which is a serious pain. 
> (also leading to a considerable write/read delay)
> In order to help alleviate this issue, we can consider multiple clusters 
> independent of each other i.e. each of them are their own leader/follower 
> group for the _same partition set_. The difference here is that these 
> clusters can _communicate_ between one another. 
> At first, this might seem redund

[jira] [Updated] (KAFKA-9733) Consider addition to Kafka's replication model

2020-03-18 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9733:
--
Description: 
Note: Description still not finished.

This feature I'm proposing might not offer too much of a performance boost, but 
I think it is still worth considering. In our current replication model, we 
have a single leader and several followers (with our ISR included). However, 
the current bottleneck would be that once the leader goes down, it will take a 
while to get the next leader online, which is a serious pain. (also leading to 
a considerable write/read delay)

In order to help alleviate this issue, we can consider multiple clusters 
independent of each other i.e. each of them are their own leader/follower group 
for the _same partition set_. The difference here is that these clusters can 
_communicate_ between one another. 

At first, this might seem redundant, but there is a reasoning to this:
 # Let's say we have two leader/follower groups (I must note that these two 
groups does _not_ have shared memory) for the same replicated partition.
 # One leader goes down, and that means for the respective followers, they 
would under normal circumstances be unable to receive new write updates.
 # However, in this situation, we can have those followers poll their 
write/read requests from the other group whose leader has _not gone down._ It 
doesn't necessarily have to be  the leader either, it can be other members from 
that group's ISR. 
 # The idea here is that if the members of these two groups detect that they 
are lagging behind another, they would be able to poll one another for updates.

So what is the difference here from just having multiple leaders in a single 
cluster?

The answer is that the leader is responsible for making sure that there is 
consistency within _its own cluster._ Not the other cluster it is in 
communication with.  

  was:
Note: Description still not finished.

This feature I'm proposing might not offer too much of a performance boost, but 
I think it is still worth considering. In our current replication model, we 
have a single leader and several followers (with our ISR included). However, 
the current bottleneck would be that once the leader goes down, it will take a 
while to get the next leader online, which is a serious pain. (also leading to 
a considerable write/read delay)

In order to help alleviate this issue, we can consider multiple clusters 
independent of each other i.e. each of them are their own leader/follower group 
for the _same partition set_. The difference here is that these clusters can 
_communicate_ between one another. 

At first, this might seem redundant, but there is a reasoning to this:
 # Let's say we have two leader/follower groups for the same replicated 
partition.
 # One leader goes down, and that means for the respective followers, they 
would under normal circumstances be unable to receive new write updates.
 # However, in this situation, we can have those followers poll their 
write/read requests from the other group whose leader has _not gone down._ It 
doesn't necessarily have to be  the leader either, it can be other members from 
that group's ISR. 
 # The idea here is that if the members of these two groups detect that they 
are lagging behind another, they would be able to poll one another for updates.

So what is the difference here from just having multiple leaders in a single 
cluster?

The answer is that the leader is responsible for making sure that there is 
consistency within _its own cluster._ Not the other cluster it is in 
communication with.  


> Consider addition to Kafka's replication model
> --
>
> Key: KAFKA-9733
> URL: https://issues.apache.org/jira/browse/KAFKA-9733
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Richard Yu
>Priority: Minor
>
> Note: Description still not finished.
> This feature I'm proposing might not offer too much of a performance boost, 
> but I think it is still worth considering. In our current replication model, 
> we have a single leader and several followers (with our ISR included). 
> However, the current bottleneck would be that once the leader goes down, it 
> will take a while to get the next leader online, which is a serious pain. 
> (also leading to a considerable write/read delay)
> In order to help alleviate this issue, we can consider multiple clusters 
> independent of each other i.e. each of them are their own leader/follower 
> group for the _same partition set_. The difference here is that these 
> clusters can _communicate_ between one another. 
> At first, this might seem redundant, but there is a reasoning to this:
>  # Let's say we have two leader/follower groups (I must note that these two 
> groups does _n

[jira] [Created] (KAFKA-9733) Consider addition to Kafka's replication model

2020-03-18 Thread Richard Yu (Jira)
Richard Yu created KAFKA-9733:
-

 Summary: Consider addition to Kafka's replication model
 Key: KAFKA-9733
 URL: https://issues.apache.org/jira/browse/KAFKA-9733
 Project: Kafka
  Issue Type: New Feature
  Components: clients, core
Reporter: Richard Yu


Note: Description still not finished.

This feature I'm proposing might not offer too much of a performance boost, but 
I think it is still worth considering. In our current replication model, we 
have a single leader and several followers (with our ISR included). However, 
the current bottleneck would be that once the leader goes down, it will take a 
while to get the next leader online, which is a serious pain. (also leading to 
a considerable write/read delay)

In order to help alleviate this issue, we can consider multiple clusters 
independent of each other i.e. each of them are their own leader/follower group 
for the _same partition set_. The difference here is that these clusters can 
_communicate_ between one another. 

At first, this might seem redundant, but there is a reasoning to this:
 # Let's say we have two leader/follower groups for the same replicated 
partition.
 # One leader goes down, and that means for the respective followers, they 
would under normal circumstances be unable to receive new write updates.
 # However, in this situation, we can have those followers poll their 
write/read requests from the other group whose leader has _not gone down._ It 
doesn't necessarily have to be  the leader either, it can be other members from 
that group's ISR. 
 # The idea here is that if the members of these two groups detect that they 
are lagging behind another, they would be able to poll one another for updates.

So what is the difference here from just having multiple leaders in a single 
cluster?

The answer is that the leader is responsible for making sure that there is 
consistency within _its own cluster._ Not the other cluster it is in 
communication with.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-01-10 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013370#comment-17013370
 ] 

Richard Yu edited comment on KAFKA-8770 at 1/11/20 4:23 AM:


I have created a draft KIP for this JIRA. [~xmar]  [~mjsax] [~vvcephei] Input 
would be greatly appreciated! Right now, I've not formalized any API additions 
/ configuration changes. It would be good first if we can get some discussion 
on what is needed and what is not! 

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]

 


was (Author: yohan123):
I have created a draft KIP for this JIRA. [~xmar]  [~mjsax] [~vvcephei] Input 
would be greatly appreciated! Right now, I've not formalized any API additions 
/ configuration changes. It would be good first if we can get some discussion 
on what is needed and what is not! 

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-NUM%3A+Add+emit+on+change+support+for+Kafka+Streams#KIP-NUM:AddemitonchangesupportforKafkaStreams-DetailsonCoreImprovement]

 

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-01-10 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013370#comment-17013370
 ] 

Richard Yu commented on KAFKA-8770:
---

I have created a draft KIP for this JIRA. [~xmar]  [~mjsax] [~vvcephei] Input 
would be greatly appreciated! Right now, I've not formalized any API additions 
/ configuration changes. It would be good first if we can get some discussion 
on what is needed and what is not! 

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-NUM%3A+Add+emit+on+change+support+for+Kafka+Streams#KIP-NUM:AddemitonchangesupportforKafkaStreams-DetailsonCoreImprovement]

 

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-01-10 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013230#comment-17013230
 ] 

Richard Yu edited comment on KAFKA-8770 at 1/10/20 10:09 PM:
-

[~vvcephei] Actually, I did think of something which might be very useful as a 
performance enhancement. As mentioned in the JIRA description, Kafka Streams 
would load prior results and compare them to the original. However, that 
nonetheless has potential to be a severe hit to processing speed. I propose 
that instead of loading the prior results, we just get the hash code for that 
prior result instead.

If there is a no op, the hash code of the prior result would be the same as the 
one that we have currently. However, if the result has _changed,_ then if the 
hash code function have been implemented correctly, the hash code would have 
changed correspondingly as well. Therefore, what should be done is the 
following:
 # We keep the hash codes of prior results in some store / whatever other 
device we might be able to use for storage. 
 # Whenever we obtain a new processed result,  retrieve corresponding prior 
hashcode to see if it had changed. 
 # Update store / table as necessary if the hash code has changed. 


was (Author: yohan123):
[~vvcephei] Actually, I did think of something which might be very useful as a 
performance enhancement. As mentioned in the JIRA description, Kafka Streams 
would load prior results and compare them to the original. However, that 
nonetheless has potential to be a severe hit to processing speed. I propose 
that instead of loading the prior results, we just get the hash code for that 
prior result instead.

If there is a no op, the hash code of the prior result would be the same as the 
one that we have currently. However, if the result has _changed,_ then if the 
hash code function have been implemented correctly, the hash code would have 
changed correspondingly as well. Therefore, what should be done is the 
following:
 # We keep the hash codes of prior results in some store / whatever other 
device we might be able to use for storage. 
 # Whenever we obtain a new processed result,  retrieve corresponding prior 
hashcode to see if it had changed. 
 # Update store / table as necessary if the hash code has changed.

 

 

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-01-10 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013230#comment-17013230
 ] 

Richard Yu commented on KAFKA-8770:
---

[~vvcephei] Actually, I did think of something which might be very useful as a 
performance enhancement. As mentioned in the JIRA description, Kafka Streams 
would load prior results and compare them to the original. However, that 
nonetheless has potential to be a severe hit to processing speed. I propose 
that instead of loading the prior results, we just get the hash code for that 
prior result instead.

If there is a no op, the hash code of the prior result would be the same as the 
one that we have currently. However, if the result has _changed,_ then if the 
hash code function have been implemented correctly, the hash code would have 
changed correspondingly as well. Therefore, what should be done is the 
following:
 # We keep the hash codes of prior results in some store / whatever other 
device we might be able to use for storage. 
 # Whenever we obtain a new processed result,  retrieve corresponding prior 
hashcode to see if it had changed. 
 # Update store / table as necessary if the hash code has changed.

 

 

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-01-03 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007860#comment-17007860
 ] 

Richard Yu commented on KAFKA-8770:
---

[~vvcephei] It seems that loading all the prior results would be wasteful. From 
what I could tell, it seems that we can pursue a caching strategy with this one 
i.e. use {{CachingKeyValueStore}} or the like if we want to implement this 
feature. 

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-07 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9285:
--
Comment: was deleted

(was: It looks like I didn't do my research. Looks like this is already fixed 
by Kafka Connect.)

> Implement failed message topic to account for processing lag during failure
> ---
>
> Key: KAFKA-9285
> URL: https://issues.apache.org/jira/browse/KAFKA-9285
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Major
>  Labels: kip
>
> Presently, in current Kafka failure schematics, when a consumer crashes, the 
> user is typically responsible for both detecting as well as restarting the 
> failed consumer. Therefore, during this period of time, when the consumer is 
> dead, it would result in a period of inactivity where no records are 
> consumed, hence lag results. Previously, there has been attempts to resolve 
> this problem: when failure is detected by broker, a substitute consumer will 
> be started (the so-called [Rebalance 
> Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
>  which will continue processing records in Kafka's stead. 
> However, this has complications, as records will only be stored locally, and 
> in case of this consumer failing as well, that data will be lost. Instead, we 
> need to consider how we can still process these records and at the same time 
> effectively _persist_ them. It is here that I propose the concept of a 
> _failed message topic._ At a high level, it works like this. When we find 
> that a consumer has failed, messages which was originally meant to be sent to 
> that consumer would be redirected to this failed messaged topic. The user can 
> choose to assign consumers to this topic, which would consume messages (that 
> would've originally been processed by the failed consumers) from it.
> Naturally, records from different topics can not go into the same failed 
> message topic, since we cannot tell which records belong to which consumer.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-07 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu resolved KAFKA-9285.
---
Resolution: Fixed

Already resolved by Kafka Connect.

> Implement failed message topic to account for processing lag during failure
> ---
>
> Key: KAFKA-9285
> URL: https://issues.apache.org/jira/browse/KAFKA-9285
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Major
>  Labels: kip
>
> Presently, in current Kafka failure schematics, when a consumer crashes, the 
> user is typically responsible for both detecting as well as restarting the 
> failed consumer. Therefore, during this period of time, when the consumer is 
> dead, it would result in a period of inactivity where no records are 
> consumed, hence lag results. Previously, there has been attempts to resolve 
> this problem: when failure is detected by broker, a substitute consumer will 
> be started (the so-called [Rebalance 
> Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
>  which will continue processing records in Kafka's stead. 
> However, this has complications, as records will only be stored locally, and 
> in case of this consumer failing as well, that data will be lost. Instead, we 
> need to consider how we can still process these records and at the same time 
> effectively _persist_ them. It is here that I propose the concept of a 
> _failed message topic._ At a high level, it works like this. When we find 
> that a consumer has failed, messages which was originally meant to be sent to 
> that consumer would be redirected to this failed messaged topic. The user can 
> choose to assign consumers to this topic, which would consume messages (that 
> would've originally been processed by the failed consumers) from it.
> Naturally, records from different topics can not go into the same failed 
> message topic, since we cannot tell which records belong to which consumer.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-07 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990705#comment-16990705
 ] 

Richard Yu commented on KAFKA-9285:
---

It looks like I didn't do my research. Looks like this is already fixed by 
Kafka Connect.

> Implement failed message topic to account for processing lag during failure
> ---
>
> Key: KAFKA-9285
> URL: https://issues.apache.org/jira/browse/KAFKA-9285
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Major
>  Labels: kip
>
> Presently, in current Kafka failure schematics, when a consumer crashes, the 
> user is typically responsible for both detecting as well as restarting the 
> failed consumer. Therefore, during this period of time, when the consumer is 
> dead, it would result in a period of inactivity where no records are 
> consumed, hence lag results. Previously, there has been attempts to resolve 
> this problem: when failure is detected by broker, a substitute consumer will 
> be started (the so-called [Rebalance 
> Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
>  which will continue processing records in Kafka's stead. 
> However, this has complications, as records will only be stored locally, and 
> in case of this consumer failing as well, that data will be lost. Instead, we 
> need to consider how we can still process these records and at the same time 
> effectively _persist_ them. It is here that I propose the concept of a 
> _failed message topic._ At a high level, it works like this. When we find 
> that a consumer has failed, messages which was originally meant to be sent to 
> that consumer would be redirected to this failed messaged topic. The user can 
> choose to assign consumers to this topic, which would consume messages (that 
> would've originally been processed by the failed consumers) from it.
> Naturally, records from different topics can not go into the same failed 
> message topic, since we cannot tell which records belong to which consumer.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-06 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9285:
--
Labels: kip  (was: )

> Implement failed message topic to account for processing lag during failure
> ---
>
> Key: KAFKA-9285
> URL: https://issues.apache.org/jira/browse/KAFKA-9285
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Major
>  Labels: kip
>
> Presently, in current Kafka failure schematics, when a consumer crashes, the 
> user is typically responsible for both detecting as well as restarting the 
> failed consumer. Therefore, during this period of time, when the consumer is 
> dead, it would result in a period of inactivity where no records are 
> consumed, hence lag results. Previously, there has been attempts to resolve 
> this problem: when failure is detected by broker, a substitute consumer will 
> be started (the so-called [Rebalance 
> Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
>  which will continue processing records in Kafka's stead. 
> However, this has complications, as records will only be stored locally, and 
> in case of this consumer failing as well, that data will be lost. Instead, we 
> need to consider how we can still process these records and at the same time 
> effectively _persist_ them. It is here that I propose the concept of a 
> _failed message topic._ At a high level, it works like this. When we find 
> that a consumer has failed, messages which was originally meant to be sent to 
> that consumer would be redirected to this failed messaged topic. The user can 
> choose to assign consumers to this topic, which would consume messages (that 
> would've originally been processed by the failed consumers) from it.
> Naturally, records from different topics can not go into the same failed 
> message topic, since we cannot tell which records belong to which consumer.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-06 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-9285:
-

Assignee: Richard Yu

> Implement failed message topic to account for processing lag during failure
> ---
>
> Key: KAFKA-9285
> URL: https://issues.apache.org/jira/browse/KAFKA-9285
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Major
>
> Presently, in current Kafka failure schematics, when a consumer crashes, the 
> user is typically responsible for both detecting as well as restarting the 
> failed consumer. Therefore, during this period of time, when the consumer is 
> dead, it would result in a period of inactivity where no records are 
> consumed, hence lag results. Previously, there has been attempts to resolve 
> this problem: when failure is detected by broker, a substitute consumer will 
> be started (the so-called [Rebalance 
> Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
>  which will continue processing records in Kafka's stead. 
> However, this has complications, as records will only be stored locally, and 
> in case of this consumer failing as well, that data will be lost. Instead, we 
> need to consider how we can still process these records and at the same time 
> effectively _persist_ them. It is here that I propose the concept of a 
> _failed message topic._ At a high level, it works like this. When we find 
> that a consumer has failed, messages which was originally meant to be sent to 
> that consumer would be redirected to this failed messaged topic. The user can 
> choose to assign consumers to this topic, which would consume messages (that 
> would've originally been processed by the failed consumers) from it.
> Naturally, records from different topics can not go into the same failed 
> message topic, since we cannot tell which records belong to which consumer.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-06 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-9285:
--
Description: 
Presently, in current Kafka failure schematics, when a consumer crashes, the 
user is typically responsible for both detecting as well as restarting the 
failed consumer. Therefore, during this period of time, when the consumer is 
dead, it would result in a period of inactivity where no records are consumed, 
hence lag results. Previously, there has been attempts to resolve this problem: 
when failure is detected by broker, a substitute consumer will be started (the 
so-called [Rebalance 
Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
 which will continue processing records in Kafka's stead. 

However, this has complications, as records will only be stored locally, and in 
case of this consumer failing as well, that data will be lost. Instead, we need 
to consider how we can still process these records and at the same time 
effectively _persist_ them. It is here that I propose the concept of a _failed 
message topic._ At a high level, it works like this. When we find that a 
consumer has failed, messages which was originally meant to be sent to that 
consumer would be redirected to this failed messaged topic. The user can choose 
to assign consumers to this topic, which would consume messages (that would've 
originally been processed by the failed consumers) from it.

Naturally, records from different topics can not go into the same failed 
message topic, since we cannot tell which records belong to which consumer.

 

  was:
Presently, in current Kafka failure schematics, when a consumer crashes, the 
user is typically responsible for both detecting as well as restarting the 
failed consumer. Therefore, during this period of time, when the consumer is 
dead, it would result in a period of inactivity where no records are consumed, 
hence lag results. Previously, there has been attempts to resolve this problem: 
when failure is detected by broker, a substitute consumer will be started (the 
so-called [Rebalance 
Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
 which will continue processing records in Kafka's stead. 

However, this has complications, as records will only be stored locally, and in 
case of this consumer failing as well, that data will be lost. Instead, we need 
to consider how we can still process these records and at the same time 
effectively _persist_ them. It is here that I propose the concept of a _failed 
message topic._ At a high level, it works like this. When we find that a 
consumer has failed, messages which was originally meant to be sent to that 
consumer would be redirected to this failed messaged topic. The user can choose 
to assign consumers to this topic, which would consume messages from failed 
consumers while other consumer threads are down. 

Naturally, records from different topics can not go into the same failed 
message topic, since we cannot tell which records belong to which consumer.

 


> Implement failed message topic to account for processing lag during failure
> ---
>
> Key: KAFKA-9285
> URL: https://issues.apache.org/jira/browse/KAFKA-9285
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>
> Presently, in current Kafka failure schematics, when a consumer crashes, the 
> user is typically responsible for both detecting as well as restarting the 
> failed consumer. Therefore, during this period of time, when the consumer is 
> dead, it would result in a period of inactivity where no records are 
> consumed, hence lag results. Previously, there has been attempts to resolve 
> this problem: when failure is detected by broker, a substitute consumer will 
> be started (the so-called [Rebalance 
> Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
>  which will continue processing records in Kafka's stead. 
> However, this has complications, as records will only be stored locally, and 
> in case of this consumer failing as well, that data will be lost. Instead, we 
> need to consider how we can still process these records and at the same time 
> effectively _persist_ them. It is here that I propose the concept of a 
> _failed message topic._ At a high level, it works like this. When we find 
> that a consumer has failed, messages which was originally meant to be sent to 
> that consumer would be redirected to this failed messaged topic. The user can 
> choose to assign consumers to this topic, which would consume messages (that 
> would've originally been processed by the failed cons

[jira] [Commented] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-06 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990303#comment-16990303
 ] 

Richard Yu commented on KAFKA-9285:
---

cc [~bchen225242] You might be interested in this.

> Implement failed message topic to account for processing lag during failure
> ---
>
> Key: KAFKA-9285
> URL: https://issues.apache.org/jira/browse/KAFKA-9285
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>
> Presently, in current Kafka failure schematics, when a consumer crashes, the 
> user is typically responsible for both detecting as well as restarting the 
> failed consumer. Therefore, during this period of time, when the consumer is 
> dead, it would result in a period of inactivity where no records are 
> consumed, hence lag results. Previously, there has been attempts to resolve 
> this problem: when failure is detected by broker, a substitute consumer will 
> be started (the so-called [Rebalance 
> Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
>  which will continue processing records in Kafka's stead. 
> However, this has complications, as records will only be stored locally, and 
> in case of this consumer failing as well, that data will be lost. Instead, we 
> need to consider how we can still process these records and at the same time 
> effectively _persist_ them. It is here that I propose the concept of a 
> _failed message topic._ At a high level, it works like this. When we find 
> that a consumer has failed, messages which was originally meant to be sent to 
> that consumer would be redirected to this failed messaged topic. The user can 
> choose to assign consumers to this topic, which would consume messages from 
> failed consumers while other consumer threads are down. 
> Naturally, records from different topics can not go into the same failed 
> message topic, since we cannot tell which records belong to which consumer.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-06 Thread Richard Yu (Jira)
Richard Yu created KAFKA-9285:
-

 Summary: Implement failed message topic to account for processing 
lag during failure
 Key: KAFKA-9285
 URL: https://issues.apache.org/jira/browse/KAFKA-9285
 Project: Kafka
  Issue Type: New Feature
  Components: consumer
Reporter: Richard Yu


Presently, in current Kafka failure schematics, when a consumer crashes, the 
user is typically responsible for both detecting as well as restarting the 
failed consumer. Therefore, during this period of time, when the consumer is 
dead, it would result in a period of inactivity where no records are consumed, 
hence lag results. Previously, there has been attempts to resolve this problem: 
when failure is detected by broker, a substitute consumer will be started (the 
so-called [Rebalance 
Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
 which will continue processing records in Kafka's stead. 

However, this has complications, as records will only be stored locally, and in 
case of this consumer failing as well, that data will be lost. Instead, we need 
to consider how we can still process these records and at the same time 
effectively _persist_ them. It is here that I propose the concept of a _failed 
message topic._ At a high level, it works like this. When we find that a 
consumer has failed, messages which was originally meant to be sent to that 
consumer would be redirected to this failed messaged topic. The user can choose 
to assign consumers to this topic, which would consume messages from failed 
consumers while other consumer threads are down. 

Naturally, records from different topics can not go into the same failed 
message topic, since we cannot tell which records belong to which consumer.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-30 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16985408#comment-16985408
 ] 

Richard Yu commented on KAFKA-8769:
---

[~vvcephei] [~mjsax] I'm not sure if you read the KIP, but on it, its 
explicitly stated that the user is allowed control over which system of 
tracking they want. If they don't want per key stream time tracking, they can 
disable it (or enable it if they wish, we can have our default policy remain 
with per partition tracking).

Seems like some users still have problems with per partition tracking anyways, 
so I'd imagine that this would be a welcome fix for them. There are workarounds 
which avoid making the low traffic problem worse, as I mentioned in earlier 
discussion, but I don't know if that is acceptable for you.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-11 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16971888#comment-16971888
 ] 

Richard Yu commented on KAFKA-8769:
---

[~vvcephei] [~mjsax] [~bbejeck] and anyone else who wishes to have some input, 
I made a KIP-540 for this issue. I just want some discussion on how we would go 
about implementing this system. 

Below is the link

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-540%3A+Implement+per+key+stream+time+tracking]

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-05 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967926#comment-16967926
 ] 

Richard Yu commented on KAFKA-8769:
---

[~vvcephei] Do you have any ideas on advancing stream time? It seems unnatural 
to do it artificially, so I wonder if we can just move ahead with the per key 
stream time part of the issue, since that seems to be the meat of it.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-02 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965404#comment-16965404
 ] 

Richard Yu commented on KAFKA-8769:
---

Adding on to how KIP-539 is different from KIP-424. KIP-424's sole intention 
was to add a suppress() operator which can flush on wall clock time. On the 
other hand, KIP-539's original intention was not to add this method, but just 
to flush out records which had stayed for too long in the suppression buffer 
(again based on wall clock time). 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-02 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965402#comment-16965402
 ] 

Richard Yu commented on KAFKA-8769:
---

Hi [~mjsax] 

I wasn't aware of the suppress-contract really, so it hadn't occurred to me 
that suppressing based on wall-clock time would've been a bad idea.

 [~vvcephei] did indicate to me that KIP-424 had some overlapping use cases 
with this KIP. So I thought it would be convenient to introduce both of them at 
once. Although, it is clear from your opinion that you don't believe flushing() 
based on wall-clock time would be ideal.

In this case, should we just proceed with implementing per key stream time 
tracking then first? Because the issue [~vvcephei] believed needed to be solved 
was first getting rid of the records that are stuck in the buffers somehow, but 
really, they can be only evicted on two time systems: stream time or wall 
clock. The second option is out. While for the first, how would we 
realistically advance stream time?  Do we artificially jncrease it somehow? 
That doesn't seem right, since records sent by the user is what determines it. 
So I'm at a loss to how we should fix this low traffic suppression issue. 
[~vvcephei] Your thoughts?

I do have a solution which can avoid making the low traffic suppression problem 
worse, and that is to maintain the old behavior of eviction. That is: in per 
key stream time tracking, it was assumed that we evict on _per key_ stream 
time, in which case, it will make the problem worse. However, if we evict them 
still on _per partition_ stream time, the logic remains the same, and the 
problem does not become any worse. In which case, we can use the finer 
granularity of per key stream time tracking for the calculation of grace 
periods, but not for suppression buffers.

[~mjsax] how does this sound?

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time an

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2019-10-28 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961615#comment-16961615
 ] 

Richard Yu edited comment on KAFKA-8769 at 10/29/19 2:39 AM:
-

[~vvcephei]  [~mjsax]  Bumping the KIP. Want to see if we can get the KIP in.


was (Author: yohan123):
Bumping the KIP. Want to see if we can get the KIP in.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-10-28 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961615#comment-16961615
 ] 

Richard Yu commented on KAFKA-8769:
---

Bumping the KIP. Want to see if we can get the KIP in.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-10-24 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959308#comment-16959308
 ] 

Richard Yu commented on KAFKA-8522:
---

Found where the tombstones are inserted. That really just answered my question 
above. My bad. 

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever

2019-10-23 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958388#comment-16958388
 ] 

Richard Yu edited comment on KAFKA-8522 at 10/23/19 11:59 PM:
--

Hi [~junrao] [~hachikuji] Just have a question involving implementation of the 
KIP.

Where is the base timestamp for the RecordBatch (batch header v2) assigned its 
value? I'm asking because I'm having a bit of trouble locating where the base 
timestamp's value is assigned. 


was (Author: yohan123):
Hi [~junrao] [~hachikuji] Just have a question involving implementation of the 
KIP.

Where is the base timestamp for the RecordBatch (batch header v2) defined in 
Kafka? I'm asking because I'm having a bit of trouble locating where the base 
timestamp's value is assigned. 

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-10-23 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958388#comment-16958388
 ] 

Richard Yu commented on KAFKA-8522:
---

Hi [~junrao] [~hachikuji] Just have a question involving implementation of the 
KIP.

Where is the base timestamp for the RecordBatch (batch header v2) defined in 
Kafka? I'm asking because I'm having a bit of trouble locating where the base 
timestamp's value is assigned. 

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-10-23 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958182#comment-16958182
 ] 

Richard Yu commented on KAFKA-8769:
---

Hi all,

I have updated the KIP to include an extra feature as well due to overlapping 
usecases.

It would be great if we can get some input. :)

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-8769) Consider computing stream time independently per key

2019-10-20 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8769:
--
Comment: was deleted

(was: [~vvcephei] Just a thought. Would the low traffic problem be made 
significantly worse by per key stream time tracking?

I think that it would only be marginally worse at best than the current 
situation we have now. What we could do to get around this problem is that the 
logic in the suppression buffers stays the same (as it did before so that the 
problem does not get worse). What this means is that, yes, each individual key 
will have a different stream time. But when we evict records, we evict them 
based on the maximum of stream times _of_ all keys. This means that the low 
traffic problem is not made worse, it remains the same.

I think this is acceptable. WDYT?)

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-10-20 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955611#comment-16955611
 ] 

Richard Yu commented on KAFKA-8769:
---

Alright, so I made a KIP for the low traffic problem here:

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-539%3A+Implement+mechanism+to+flush+out+records+in+low+traffic+suppression+buffers]

 

Hope this helps! :)

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-10-18 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955053#comment-16955053
 ] 

Richard Yu commented on KAFKA-8769:
---

[~vvcephei] Just a thought. Would the low traffic problem be made significantly 
worse by per key stream time tracking?

I think that it would only be marginally worse at best than the current 
situation we have now. What we could do to get around this problem is that the 
logic in the suppression buffers stays the same (as it did before so that the 
problem does not get worse). What this means is that, yes, each individual key 
will have a different stream time. But when we evict records, we evict them 
based on the maximum of stream times _of_ all keys. This means that the low 
traffic problem is not made worse, it remains the same.

I think this is acceptable. WDYT?

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8769) Consider computing stream time independently per key

2019-10-15 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8769:
--
Labels: needs-discussion needs-kip  (was: )

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-30 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16919901#comment-16919901
 ] 

Richard Yu commented on KAFKA-8522:
---

Oh, alright. I will see how things go and create a KIP then.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever

2019-08-29 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918980#comment-16918980
 ] 

Richard Yu edited comment on KAFKA-8522 at 8/29/19 10:06 PM:
-

[~junrao] I think I've hit a caveat with your approach. The problem I've 
encountered here is that the partitions that are "assigned" to a LogCleaner 
could fluctuate after the LogCleaner instance is constructed. This has some 
implications because new TopicPartitions could be added to or removed from this 
"assignment". The consequences are that files are created and removed far more 
often than comfortable under certain conditions (not completely sure here).  

For details, I noticed that in LogCleanerManager constructor, the {{logs}} 
parameter (the equivalent of the "assignment") is essentially a ConcurrentMap 
which can have its contents change after initialization. That means files also 
have to be repeatedly created and destroyed. Your thoughts on this?


was (Author: yohan123):
[~junrao] I think I've hit a caveat with your approach. The problem I've 
encountered here is that the partitions that are "assigned" to a LogCleaner 
could fluctuate after the LogCleaner instance is constructed. This has some 
implications because new TopicPartitions could be added to or removed from this 
"assignment". The consequences are that files are created and removed far more 
often than comfortable under certain conditions. 

For details, I noticed that in LogCleanerManager constructor, the {{logs}} 
parameter (the equivalent of the "assignment") is essentially a ConcurrentMap 
which can have its contents change after initialization. That means files also 
have to be repeatedly created and destroyed. Your thoughts on this?

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-29 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918985#comment-16918985
 ] 

Richard Yu commented on KAFKA-8522:
---

I've some thoughts to get around this. (the best approach I can think of is 
once a topic partition and its respective checkpoint file is created, we don't 
remove the entry from the partition to checkpoint map). Once the 
LogCleanerManager instance is destroyed (with the exception of when 
{{partitions}} are having their checkpoint files being moved from one directory 
to the other), then we remove all files. 

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever

2019-08-29 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918980#comment-16918980
 ] 

Richard Yu edited comment on KAFKA-8522 at 8/29/19 10:01 PM:
-

[~junrao] I think I've hit a caveat with your approach. The problem I've 
encountered here is that the partitions that are "assigned" to a LogCleaner 
could fluctuate after the LogCleaner instance is constructed. This has some 
implications because new TopicPartitions could be added to or removed from this 
"assignment". The consequences are that files are created and removed far more 
often than comfortable under certain conditions. 

For details, I noticed that in LogCleanerManager constructor, the {{logs}} 
parameter (the equivalent of the "assignment") is essentially a ConcurrentMap 
which can have its contents change after initialization. That means files also 
have to be repeatedly created and destroyed. Your thoughts on this?


was (Author: yohan123):
@Jun Rao I think I've hit a caveat with your approach. The problem I've 
encountered here is that the partitions that are "assigned" to a LogCleaner 
could fluctuate after the LogCleaner instance is constructed. This has some 
implications because new TopicPartitions could be added to or removed from this 
"assignment". The consequences are that files are created and removed far more 
often than comfortable under certain conditions. 

For details, I noticed that in LogCleanerManager constructor, the {{logs}} 
parameter (the equivalent of the "assignment") is essentially a ConcurrentMap 
which can have its contents change after initialization. That means files also 
have to be repeatedly created and destroyed. Your thoughts on this?

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-29 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918980#comment-16918980
 ] 

Richard Yu commented on KAFKA-8522:
---

@Jun Rao I think I've hit a caveat with your approach. The problem I've 
encountered here is that the partitions that are "assigned" to a LogCleaner 
could fluctuate after the LogCleaner instance is constructed. This has some 
implications because new TopicPartitions could be added to or removed from this 
"assignment". The consequences are that files are created and removed far more 
often than comfortable under certain conditions. 

For details, I noticed that in LogCleanerManager constructor, the {{logs}} 
parameter (the equivalent of the "assignment") is essentially a ConcurrentMap 
which can have its contents change after initialization. That means files also 
have to be repeatedly created and destroyed. Your thoughts on this?

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-20 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911942#comment-16911942
 ] 

Richard Yu commented on KAFKA-8522:
---

Alright, sounds good.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-17 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909878#comment-16909878
 ] 

Richard Yu commented on KAFKA-8522:
---

So in summary, should I interpret your suggestion as creating multiple files 
per logDir (logDir corresponds to a single disk)?  In that case, for each dir 
(or directory), multiple files will be created. I guess that was probably what 
you had in mind.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever

2019-08-17 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909876#comment-16909876
 ] 

Richard Yu edited comment on KAFKA-8522 at 8/18/19 3:22 AM:


The below code is located in LogManager.scala. I noticed that in LogManager 
this was the following code that was used for the LogManager constructor:

 
{code:java}
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), ...)
{code}
 

The variable {{config}} is a KafkaConfig instance. The {{logDirs}} you see here 
will be the ones that will eventually be used in {{LogCleanerManager}}. This 
was the place from which I drew the conclusion in my previous comment.


was (Author: yohan123):
The below code is located in LogManager.scala. I noticed that in LogManager 
this was the following code that was used for the LogManager constructor:

 
{code:java}
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),

      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),

      topicConfigs = topicConfigs,

      initialDefaultConfig = defaultLogConfig,

      cleanerConfig = cleanerConfig,

      recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,

      flushCheckMs = config.logFlushSchedulerIntervalMs,

      flushRecoveryOffsetCheckpointMs = 
config.logFlushOffsetCheckpointIntervalMs,

      flushStartOffsetCheckpointMs = 
config.logFlushStartOffsetCheckpointIntervalMs,

      retentionCheckMs = config.logCleanupIntervalMs,

      maxPidExpirationMs = config.transactionIdExpirationMs,

      scheduler = kafkaScheduler,

      brokerState = brokerState,

      brokerTopicStats = brokerTopicStats,

      logDirFailureChannel = logDirFailureChannel,

      time = time)
{code}
 

The variable {{config}} is a KafkaConfig instance. The {{logDirs}} you see here 
will be the ones that will eventually be used in {{LogCleanerManager}}. This 
was the place from which I drew the conclusion in my previous comment.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-17 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909876#comment-16909876
 ] 

Richard Yu commented on KAFKA-8522:
---

The below code is located in LogManager.scala. I noticed that in LogManager 
this was the following code that was used for the LogManager constructor:

 
{code:java}
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),

      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),

      topicConfigs = topicConfigs,

      initialDefaultConfig = defaultLogConfig,

      cleanerConfig = cleanerConfig,

      recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,

      flushCheckMs = config.logFlushSchedulerIntervalMs,

      flushRecoveryOffsetCheckpointMs = 
config.logFlushOffsetCheckpointIntervalMs,

      flushStartOffsetCheckpointMs = 
config.logFlushStartOffsetCheckpointIntervalMs,

      retentionCheckMs = config.logCleanupIntervalMs,

      maxPidExpirationMs = config.transactionIdExpirationMs,

      scheduler = kafkaScheduler,

      brokerState = brokerState,

      brokerTopicStats = brokerTopicStats,

      logDirFailureChannel = logDirFailureChannel,

      time = time)
{code}
 

The variable {{config}} is a KafkaConfig instance. The {{logDirs}} you see here 
will be the ones that will eventually be used in {{LogCleanerManager}}. This 
was the place from which I drew the conclusion in my previous comment.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever

2019-08-17 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909866#comment-16909866
 ] 

Richard Yu edited comment on KAFKA-8522 at 8/18/19 3:08 AM:


[~junrao] Just want some clarifications on something. When 
{{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was 
given a sequence of files the program can write into. These files' classpaths 
were given by the user in the {{logDirs}} property located in {{KafkaConfig}}.

It was here that I got a little confused. Firstly, I assumed that the files was 
limited in number by the user to be per disk (since KafkaConfig's values should 
be controlled by the user). This is where I think there could be a problem. In 
a real world situation, the number of partitions typically exceeds the number 
of disks available, so in other words, that would mean that if we pair off one 
checkpoint file per partition, there would be some partitions that would have 
no checkpoint files to write to.

Is my understanding of the situation correct? I am not completely sure about 
this, but this was how it appeared to me.


was (Author: yohan123):
[~junrao] Just want some clarifications on something. When 
{{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was 
given (a sequence of {{file}}s{{) that were used to determine the checkpoint 
files a person can write into. After some research, I discovered that these 
files' classpaths was given by KafkaConfig's {{logDirs}} property. 

It was here that I got a little confused. Firstly, I assumed that the files was 
limited in number by the user to be per disk (since KafkaConfig's values should 
be controlled by the user). This is where I think there could be a problem. In 
a real world situation, the number of partitions typically exceeds the number 
of disks available, so in other words, that would mean that if we pair off one 
checkpoint file per partition, there would be some partitions that would have 
no checkpoint files to write to.

Is my understanding of the situation correct? I am not completely sure about 
this, but this was how it appeared to me.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8770) Either switch to or add an option for emit-on-change

2019-08-17 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8770:
--
Labels: needs-kip  (was: )

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-8522) Tombstones can survive forever

2019-08-17 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909866#comment-16909866
 ] 

Richard Yu edited comment on KAFKA-8522 at 8/18/19 2:18 AM:


[~junrao] Just want some clarifications on something. When 
{{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was 
given (a sequence of {{file}}s{{) that were used to determine the checkpoint 
files a person can write into. After some research, I discovered that these 
files' classpaths was given by KafkaConfig's {{logDirs}} property. 

It was here that I got a little confused. Firstly, I assumed that the files was 
limited in number by the user to be per disk (since KafkaConfig's values should 
be controlled by the user). This is where I think there could be a problem. In 
a real world situation, the number of partitions typically exceeds the number 
of disks available, so in other words, that would mean that if we pair off one 
checkpoint file per partition, there would be some partitions that would have 
no checkpoint files to write to.

Is my understanding of the situation correct? I am not completely sure about 
this, but this was how it appeared to me.


was (Author: yohan123):
[~junrao] Just want some clarifications on something. When 
{{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was 
given (a sequence of {{File}}s) that were used to determine the checkpoint 
files a person can write into. After some research, I discovered that these 
files' classpaths was given by KafkaConfig's {{logDirs}} property. 

It was here that I got a little confused. Firstly, I assumed that the files was 
limited in number by the user to be per disk (since KafkaConfig's values should 
be controlled by the user). This is where I think there could be a problem. In 
a real world situation, the number of partitions typically exceeds the number 
of disks available, so in other words, that would mean that if we pair off one 
checkpoint file per partition, there would be some partitions that would have 
no checkpoint files to write to.

Is my understanding of the situation correct? I am not completely sure about 
this, but this was how it appeared to me.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-17 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909866#comment-16909866
 ] 

Richard Yu commented on KAFKA-8522:
---

[~junrao] Just want some clarifications on something. When 
{{LogCleanerManager}} is created, I noticed that a {{logDirs}} parameters was 
given (a sequence of {{File}}s) that were used to determine the checkpoint 
files a person can write into. After some research, I discovered that these 
files' classpaths was given by KafkaConfig's {{logDirs}} property. 

It was here that I got a little confused. Firstly, I assumed that the files was 
limited in number by the user to be per disk (since KafkaConfig's values should 
be controlled by the user). This is where I think there could be a problem. In 
a real world situation, the number of partitions typically exceeds the number 
of disks available, so in other words, that would mean that if we pair off one 
checkpoint file per partition, there would be some partitions that would have 
no checkpoint files to write to.

Is my understanding of the situation correct? I am not completely sure about 
this, but this was how it appeared to me.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8806) Kafka.poll spends significant amount of time in KafkaConsumer.updateAssignmentMetadataIfNeeded

2019-08-16 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16909445#comment-16909445
 ] 

Richard Yu commented on KAFKA-8806:
---

[~vvcephei] Since you wrote this part of the code, your thoughts on this?

> Kafka.poll spends significant amount of time in 
> KafkaConsumer.updateAssignmentMetadataIfNeeded
> --
>
> Key: KAFKA-8806
> URL: https://issues.apache.org/jira/browse/KAFKA-8806
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: Xavier Léauté
>Assignee: David Arthur
>Priority: Major
>
> Comparing the performance profile of 2.2.0 and 2.3.0, we are seeing 
> significant performance differences in the 
> {{KafkaConumer.updateAssignmentMetadataIfNeeded()}} method.
> The call to {{KafkaConsumer.updateAssignmentMetadataIfNeeded()}} now 
> represents roughly 40% of CPU time spent in {{KafkaConsumer.poll()}}, when 
> before it only represented less than 2%.
> Most of the extra time appears to be spent in 
> {{KafkaConsumer.validateOffsetsIfNeeded()}}, which did not show up in 
> previous profiles.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2019-08-13 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906692#comment-16906692
 ] 

Richard Yu edited comment on KAFKA-8769 at 8/13/19 10:50 PM:
-

Notably, a problem with this approach is that keys which are evicted from the 
cache will no longer have their timestamps tracked. If the situation occurs 
when we receive a record whose key has no tracked timestamp, then we could 
insert the key with that record timestamp as that particular key's largest time 
into the cache. Admittedly, the order in which we receive records does not 
guarantee that we have a monotonically increasing timestamp, but the latest 
records we receive and its timestamp are fair approximations.


was (Author: yohan123):
Notably, a problem with this approach is that keys which are evicted from the 
cache will no longer have their timestamps tracked. If the situation occurs 
when we receive a record whose key has no tracked timestamp, then we could 
insert the key with that record timestamp as that particular key's partition 
time into the cache. Admittedly, the order in which we receive records does not 
guarantee that we have a monotonically increasing timestamp, but the latest 
records we receive and its timestamp are fair approximations.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-08-13 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906692#comment-16906692
 ] 

Richard Yu commented on KAFKA-8769:
---

Notably, a problem with this approach is that keys which are evicted from the 
cache will no longer have their timestamps tracked. If the situation occurs 
when we receive a record whose key has no tracked timestamp, then we could 
insert the key with that record timestamp as that particular key's partition 
time into the cache. Admittedly, the order in which we receive records does not 
guarantee that we have a monotonically increasing timestamp, but the latest 
records we receive and its timestamp are fair approximations.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2019-08-13 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906674#comment-16906674
 ] 

Richard Yu edited comment on KAFKA-8769 at 8/13/19 10:29 PM:
-

[~vvcephei] On the issue regarding low traffic: would this mechanism 
necessarily be a public API and what could it look like?  Just wondering if 
this requires a KIP.

And about the scalability of the key-based partition time tracking. Just some 
thoughts at the moment. We don't have to store _all_ the keys and its 
timestamp. That really would be wasteful if at least a significant portion of 
them is not used often at a particular time. Instead, we could use some sort of 
cache (maybe LFU or LRU) which could store the most popular keys at a certain 
period in a StreamTask's lifetime.  


was (Author: yohan123):
[~vvcephei] On the issue regarding low traffic: would this mechanism 
necessarily be a public API and what could it look like?  Just wondering if 
this requires a KIP.

And about the scalability of the key-based partition time tracking. Just some 
thoughts at the moment. We don't have to store _all_ the keys and its 
timestamp. That really would be wasteful if at least a significant portion of 
them is not used often at a particular time. Instead, we could use some sort of 
cache (maybe LFU or LRU) which could store the most popular keys at a certain 
period in time.  

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-08-13 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906674#comment-16906674
 ] 

Richard Yu commented on KAFKA-8769:
---

[~vvcephei] On the issue regarding low traffic: would this mechanism 
necessarily be a public API and what could it look like?  Just wondering if 
this requires a KIP.

And about the scalability of the key-based partition time tracking. Just some 
thoughts at the moment. We don't have to store _all_ the keys and its 
timestamp. That really would be wasteful if at least a significant portion of 
them is not used often at a particular time. Instead, we could use some sort of 
cache (maybe LFU or LRU) which could store the most popular keys at a certain 
period in time.  

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-12 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16905747#comment-16905747
 ] 

Richard Yu commented on KAFKA-8522:
---

[~jagsancio] any thoughts?

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-12 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16905377#comment-16905377
 ] 

Richard Yu commented on KAFKA-8522:
---

[~junrao] A thought just occurred to me. Wouldn't we need to provide an upgrade 
path if we wish to deprecate the old checkpoint file system? If so, then how 
would we go about implementing it?

I'm not completely sure if we need one at the moment, but I'd think we do. 

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-11 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16904814#comment-16904814
 ] 

Richard Yu commented on KAFKA-8522:
---

Okay, solved the above problem. Will start with reorganizing the checkpoint 
files into separate ones for individual partitions.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-11 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16904804#comment-16904804
 ] 

Richard Yu commented on KAFKA-8522:
---

Alright, so I have found the necessary logic for the checkpoint files and the 
classes in which it is managed. (LogCleaner and LogManager).

The current problem I have is where the cleaning offsets are committed to the 
checkpoint files. Still looking into it at the moment. 

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-07 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16902209#comment-16902209
 ] 

Richard Yu commented on KAFKA-8522:
---

[~junrao] You mentioned in a previous post that we should write to a checkpoint 
file. Should we create a new checkpoint file explicitly for storing these 
offsets and cleaning times? Or should it be embedded in a preexisting 
checkpoint file.  

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned

2019-08-02 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16899331#comment-16899331
 ] 

Richard Yu commented on KAFKA-4545:
---

Sure, I will assign this issue to myself then. I will get around to this issue 
soon, just having a pending PR that needs to be merged with trunk.

[~junrao] Thanks for the heads-up. We should probably add a needs-kip label.

> tombstone needs to be removed after delete.retention.ms has passed after it 
> has been cleaned
> 
>
> Key: KAFKA-4545
> URL: https://issues.apache.org/jira/browse/KAFKA-4545
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Jun Rao
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> The algorithm for removing the tombstone in a compacted is supposed to be the 
> following.
> 1. Tombstone is never removed when it's still in the dirty portion of the log.
> 2. After the tombstone is in the cleaned portion of the log, we further delay 
> the removal of the tombstone by delete.retention.ms since the time the 
> tombstone is in the cleaned portion.
> Once the tombstone is in the cleaned portion, we know there can't be any 
> message with the same key before the tombstone. Therefore, for any consumer, 
> if it reads a non-tombstone message before the tombstone, but can read to the 
> end of the log within delete.retention.ms, it's guaranteed to see the 
> tombstone.
> However, the current implementation doesn't seem correct. We delay the 
> removal of the tombstone by delete.retention.ms since the last modified time 
> of the last cleaned segment. However, the last modified time is inherited 
> from the original segment, which could be arbitrarily old. So, the tombstone 
> may not be preserved as long as it needs to be.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned

2019-08-02 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-4545:
--
Labels: needs-kip  (was: )

> tombstone needs to be removed after delete.retention.ms has passed after it 
> has been cleaned
> 
>
> Key: KAFKA-4545
> URL: https://issues.apache.org/jira/browse/KAFKA-4545
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Jun Rao
>Assignee: Richard Yu
>Priority: Major
>  Labels: needs-kip
>
> The algorithm for removing the tombstone in a compacted is supposed to be the 
> following.
> 1. Tombstone is never removed when it's still in the dirty portion of the log.
> 2. After the tombstone is in the cleaned portion of the log, we further delay 
> the removal of the tombstone by delete.retention.ms since the time the 
> tombstone is in the cleaned portion.
> Once the tombstone is in the cleaned portion, we know there can't be any 
> message with the same key before the tombstone. Therefore, for any consumer, 
> if it reads a non-tombstone message before the tombstone, but can read to the 
> end of the log within delete.retention.ms, it's guaranteed to see the 
> tombstone.
> However, the current implementation doesn't seem correct. We delay the 
> removal of the tombstone by delete.retention.ms since the last modified time 
> of the last cleaned segment. However, the last modified time is inherited 
> from the original segment, which could be arbitrarily old. So, the tombstone 
> may not be preserved as long as it needs to be.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned

2019-08-02 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-4545:
-

Assignee: Richard Yu  (was: Jose Armando Garcia Sancio)

> tombstone needs to be removed after delete.retention.ms has passed after it 
> has been cleaned
> 
>
> Key: KAFKA-4545
> URL: https://issues.apache.org/jira/browse/KAFKA-4545
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Jun Rao
>Assignee: Richard Yu
>Priority: Major
>
> The algorithm for removing the tombstone in a compacted is supposed to be the 
> following.
> 1. Tombstone is never removed when it's still in the dirty portion of the log.
> 2. After the tombstone is in the cleaned portion of the log, we further delay 
> the removal of the tombstone by delete.retention.ms since the time the 
> tombstone is in the cleaned portion.
> Once the tombstone is in the cleaned portion, we know there can't be any 
> message with the same key before the tombstone. Therefore, for any consumer, 
> if it reads a non-tombstone message before the tombstone, but can read to the 
> end of the log within delete.retention.ms, it's guaranteed to see the 
> tombstone.
> However, the current implementation doesn't seem correct. We delay the 
> removal of the tombstone by delete.retention.ms since the last modified time 
> of the last cleaned segment. However, the last modified time is inherited 
> from the original segment, which could be arbitrarily old. So, the tombstone 
> may not be preserved as long as it needs to be.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned

2019-07-29 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16895651#comment-16895651
 ] 

Richard Yu commented on KAFKA-4545:
---

[~jagsancio]  Are you still working on this? Not too sure because it looks like 
there doesn't appear to be much work on it since the ticket was created.

> tombstone needs to be removed after delete.retention.ms has passed after it 
> has been cleaned
> 
>
> Key: KAFKA-4545
> URL: https://issues.apache.org/jira/browse/KAFKA-4545
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Jun Rao
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> The algorithm for removing the tombstone in a compacted is supposed to be the 
> following.
> 1. Tombstone is never removed when it's still in the dirty portion of the log.
> 2. After the tombstone is in the cleaned portion of the log, we further delay 
> the removal of the tombstone by delete.retention.ms since the time the 
> tombstone is in the cleaned portion.
> Once the tombstone is in the cleaned portion, we know there can't be any 
> message with the same key before the tombstone. Therefore, for any consumer, 
> if it reads a non-tombstone message before the tombstone, but can read to the 
> end of the log within delete.retention.ms, it's guaranteed to see the 
> tombstone.
> However, the current implementation doesn't seem correct. We delay the 
> removal of the tombstone by delete.retention.ms since the last modified time 
> of the last cleaned segment. However, the last modified time is inherited 
> from the original segment, which could be arbitrarily old. So, the tombstone 
> may not be preserved as long as it needs to be.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-07-16 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16886516#comment-16886516
 ] 

Richard Yu commented on KAFKA-8522:
---

[~junrao] Would you mind if tackle this issue?

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-7711) Add a bounded flush() API to Kafka Producer

2019-07-15 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885795#comment-16885795
 ] 

Richard Yu edited comment on KAFKA-7711 at 7/16/19 3:01 AM:


[~hachikuji] I think I've seen this issue come up multiple times in the past. 
Not sure exactly which issue numbers. Do you think this is a issue that really 
needs to be tackled?

Edit: Got it somewhat mixed up. There had been issues with Kafka's send API in 
the past. For example, KAFKA-6705. But not the flush() API. It might be good to 
have something like it though.

 


was (Author: yohan123):
[~hachikuji] I think I've seen this issue come up multiple times in the past. 
Not sure exactly which issue numbers. Do you think this is a issue that really 
needs to be tackled?

> Add a bounded flush()  API to Kafka Producer
> 
>
> Key: KAFKA-7711
> URL: https://issues.apache.org/jira/browse/KAFKA-7711
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: kun du
>Priority: Minor
>  Labels: needs-kip
>
> Currently the call to Producer.flush() can be hang there for indeterminate 
> time.
> It is a good idea to add a bounded flush() API and timeout if producer is 
> unable to flush all the batch records in a limited time. In this way the 
> caller of flush() has a chance to decide what to do next instead of just wait 
> forever.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7711) Add a bounded flush() API to Kafka Producer

2019-07-15 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885795#comment-16885795
 ] 

Richard Yu commented on KAFKA-7711:
---

[~hachikuji] I think I've seen this issue come up multiple times in the past. 
Not sure exactly which issue numbers. Do you think this is a issue that really 
needs to be tackled?

> Add a bounded flush()  API to Kafka Producer
> 
>
> Key: KAFKA-7711
> URL: https://issues.apache.org/jira/browse/KAFKA-7711
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: kun du
>Priority: Minor
>  Labels: needs-kip
>
> Currently the call to Producer.flush() can be hang there for indeterminate 
> time.
> It is a good idea to add a bounded flush() API and timeout if producer is 
> unable to flush all the batch records in a limited time. In this way the 
> caller of flush() has a chance to decide what to do next instead of just wait 
> forever.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-14 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884826#comment-16884826
 ] 

Richard Yu commented on KAFKA-8650:
---

[~mjsax] Do you think this ticket is something that Kafka commuity would need 
within the next release? Would be good to get a gauge on this issue's priority.

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8658) A way to configure the jmx rmi port

2019-07-14 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884810#comment-16884810
 ] 

Richard Yu commented on KAFKA-8658:
---

Alright, so here is the PR for this issue.

[https://github.com/apache/kafka/pull/7088]

> A way to configure the jmx rmi port
> ---
>
> Key: KAFKA-8658
> URL: https://issues.apache.org/jira/browse/KAFKA-8658
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 1.0.0
> Environment: Centos 7
>Reporter: Agostino Sarubbo
>Priority: Minor
>
> Hello,
> I'm on kafka-1.0.0 so I'm not sure if it is fixed in the current version.
> Atm we are using the following in the service script to use JMX:
> Environment=JMX_PORT=7666
> However there is no way to set the jmx_rmi_port. When there is no 
> specification for jmx_rmi_port the jvm assigns a random port. This 
> complicates the way we manage the firewall.
> Would be great if there is a way to set the jmx_rmi_port in the same way, 
> e.g.:
> Environment=JMX_RMI_PORT=7667
> The variable used during the jvm start is: 
> -Dcom.sun.management.jmxremote.rmi.port=



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8369) Generate an immutable Map view for generated messages with a map key

2019-07-07 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879987#comment-16879987
 ] 

Richard Yu commented on KAFKA-8369:
---

Hi [~hachikuji],

I've done some research into this issue and something has come up which 
requires some clarifications.

As of the moment, the find() API in ImplicitLinkedHashCollection takes a 
{{key}} input argument. This {{key}} is used to search for an object in the set 
with the exact same hash code and that object is returned. So my question here 
is: what exactly is the key and value for the immutable map view?

I ask this because it doesn't seem clear what key-value pairs exist. The 
closest I could come up with is the {{key}} input argument being the actual key 
and the elements stored in the set the values. Is that the way this issue is to 
be approached?

Not so sure as of the moment, so would be great if I get your input. :)

> Generate an immutable Map view for generated messages with a map key
> 
>
> Key: KAFKA-8369
> URL: https://issues.apache.org/jira/browse/KAFKA-8369
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> When using the "mapKey" feature, we get an ImplicitLinkedHashCollection which 
> can be used like a map using the `find()` API. The benefit of this is 
> hopefully avoiding a conversion to another type when handled by the broker, 
> but it is a little cumbersome to work with, so we often end up doing the 
> conversion anyway. One improvement would be to provide a way to convert this 
> collection to an immutable Map view so that it is easier to work with 
> directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-10 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860591#comment-16860591
 ] 

Richard Yu edited comment on KAFKA-8516 at 6/11/19 6:16 AM:


Well, this is when we start straying into an area called "consensus 
algorithms". Kafka's current leader-replica model loosely follows an algorithm 
referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). 
If we wish to implement the write permissions part (which looks like a pretty 
big if), then we would perhaps have to consider something along the lines of 
EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ).

cc [~hachikuji] your thoughts on this?

Edit: If implemented correctly, there should be a pretty good performance gain 
(results in RAFT paper anyways seem to indicate this).


was (Author: yohan123):
Well, this is when we start straying into an area called "consensus 
algorithms". Kafka's current leader-replica model closely follows an algorithm 
referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). 
If we wish to implement the write permissions part (which looks like a pretty 
big if), then we would perhaps have to consider something along the lines of 
EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ).

cc [~hachikuji] your thoughts on this?

Edit: If implemented correctly, there should be a pretty good performance gain 
(results in RAFT paper anyways seem to indicate this).

> Consider allowing all replicas to have read/write permissions
> -
>
> Key: KAFKA-8516
> URL: https://issues.apache.org/jira/browse/KAFKA-8516
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Yu
>Priority: Major
>
> Currently, in Kafka internals, a leader is responsible for all the read and 
> write operations requested by the user. This naturally incurs a bottleneck 
> since one replica, as the leader, would experience a significantly heavier 
> workload than other replicas and also means that all client commands must 
> pass through a chokepoint. If a leader fails, all processing effectively 
> comes to a halt until another leader election. In order to help solve this 
> problem, we could think about redesigning Kafka core so that any replica is 
> able to do read and write operations as well. That is, the system be changed 
> so that _all_ replicas have read/write permissions.
>   
>  This has multiple positives. Notably the following:
>  * Workload can be more evenly distributed since leader replicas are weighted 
> more than follower replicas (in this new design, all replicas are equal)
>  * Some failures would not be as catastrophic as in the leader-follower 
> paradigm. There is no one single "leader". If one replica goes down, others 
> are still able to read/write as needed. Processing could continue without 
> interruption.
> The implementation for such a change like this will be very extensive and 
> discussion would be needed to decide if such an improvement as described 
> above would warrant such a drastic redesign of Kafka internals.
> Relevant KIP for read permissions can be found here:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-10 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860591#comment-16860591
 ] 

Richard Yu edited comment on KAFKA-8516 at 6/11/19 6:15 AM:


Well, this is when we start straying into an area called "consensus 
algorithms". Kafka's current leader-replica model closely follows an algorithm 
referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). 
If we wish to implement the write permissions part (which looks like a pretty 
big if), then we would perhaps have to consider something along the lines of 
EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ).

cc [~hachikuji] your thoughts on this?

Edit: If implemented correctly, there should be a pretty good performance gain 
(results in RAFT paper anyways seem to indicate this).


was (Author: yohan123):
Well, this is when we start straying into an area called "consensus 
algorithms". Kafka's current leader-replica model closely follows an algorithm 
referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). 
If we wish to implement the write permissions part (which looks like a pretty 
big if), then we would perhaps have to consider something along the lines of 
EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ). 

cc [~hachikuji] your thoughts on this?

> Consider allowing all replicas to have read/write permissions
> -
>
> Key: KAFKA-8516
> URL: https://issues.apache.org/jira/browse/KAFKA-8516
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Yu
>Priority: Major
>
> Currently, in Kafka internals, a leader is responsible for all the read and 
> write operations requested by the user. This naturally incurs a bottleneck 
> since one replica, as the leader, would experience a significantly heavier 
> workload than other replicas and also means that all client commands must 
> pass through a chokepoint. If a leader fails, all processing effectively 
> comes to a halt until another leader election. In order to help solve this 
> problem, we could think about redesigning Kafka core so that any replica is 
> able to do read and write operations as well. That is, the system be changed 
> so that _all_ replicas have read/write permissions.
>   
>  This has multiple positives. Notably the following:
>  * Workload can be more evenly distributed since leader replicas are weighted 
> more than follower replicas (in this new design, all replicas are equal)
>  * Some failures would not be as catastrophic as in the leader-follower 
> paradigm. There is no one single "leader". If one replica goes down, others 
> are still able to read/write as needed. Processing could continue without 
> interruption.
> The implementation for such a change like this will be very extensive and 
> discussion would be needed to decide if such an improvement as described 
> above would warrant such a drastic redesign of Kafka internals.
> Relevant KIP for read permissions can be found here:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-10 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860591#comment-16860591
 ] 

Richard Yu commented on KAFKA-8516:
---

Well, this is when we start straying into an area called "consensus 
algorithms". Kafka's current leader-replica model closely follows an algorithm 
referred to as Raft (research paper here: [https://raft.github.io/raft.pdf] ). 
If we wish to implement the write permissions part (which looks like a pretty 
big if), then we would perhaps have to consider something along the lines of 
EPaxos ( [https://www.cs.cmu.edu/~dga/papers/epaxos-sosp2013.pdf] ). 

cc [~hachikuji] your thoughts on this?

> Consider allowing all replicas to have read/write permissions
> -
>
> Key: KAFKA-8516
> URL: https://issues.apache.org/jira/browse/KAFKA-8516
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Yu
>Priority: Major
>
> Currently, in Kafka internals, a leader is responsible for all the read and 
> write operations requested by the user. This naturally incurs a bottleneck 
> since one replica, as the leader, would experience a significantly heavier 
> workload than other replicas and also means that all client commands must 
> pass through a chokepoint. If a leader fails, all processing effectively 
> comes to a halt until another leader election. In order to help solve this 
> problem, we could think about redesigning Kafka core so that any replica is 
> able to do read and write operations as well. That is, the system be changed 
> so that _all_ replicas have read/write permissions.
>   
>  This has multiple positives. Notably the following:
>  * Workload can be more evenly distributed since leader replicas are weighted 
> more than follower replicas (in this new design, all replicas are equal)
>  * Some failures would not be as catastrophic as in the leader-follower 
> paradigm. There is no one single "leader". If one replica goes down, others 
> are still able to read/write as needed. Processing could continue without 
> interruption.
> The implementation for such a change like this will be very extensive and 
> discussion would be needed to decide if such an improvement as described 
> above would warrant such a drastic redesign of Kafka internals.
> Relevant KIP for read permissions can be found here:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-10 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8516:
--
Description: 
Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
  
 This has multiple positives. Notably the following:
 * Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all replicas are equal)
 * Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.

The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.

Relevant KIP for read permissions can be found here:

[KIP-392|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]]

  was:
Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
  
 This has multiple positives. Notably the following:
 * Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all replicas are equal)
 * Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.

The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.


> Consider allowing all replicas to have read/write permissions
> -
>
> Key: KAFKA-8516
> URL: https://issues.apache.org/jira/browse/KAFKA-8516
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Yu
>Priority: Major
>
> Currently, in Kafka internals, a leader is responsible for all the read and 
> write operations requested by the user. This naturally incurs a bottleneck 
> since one replica, as the leader, would experience a significantly heavier 
> workload than other replicas and also means that all client commands must 
> pass through a chokepoint. If a leader fails, all processing effectively 
> comes to a halt until another leader election. In order to help solve this 
> problem, we could think about redesigning Kafka core so that any replica is 
> able to do read and write operations as well. That is, the system be changed 
> so that _all_ replicas have read/write permissions.
>   
>  This has multiple positives. Notably the following:
>  * Workload can be more evenly distributed since leader replicas are weighted 
> more than follower replicas (in this new design, all replicas are equal)
>  * Some failures would not be as catastrophic as in the leader-follower 
> paradigm. There is no one single "leader". If one replica goes down, others 
> are still able to read/write as needed. Processing could continue without 
> interruption.
> The implementation for such a change like this will be very extensive and 
> discussion would be needed to decide if such an improvement as described 
> above would warrant such a drastic redesign of Kafka internals.
> Relevant KIP for read permissions can be found here:
> [KIP-392|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

[jira] [Updated] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-10 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8516:
--
Description: 
Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
  
 This has multiple positives. Notably the following:
 * Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all replicas are equal)
 * Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.

The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.

Relevant KIP for read permissions can be found here:

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]

  was:
Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
  
 This has multiple positives. Notably the following:
 * Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all replicas are equal)
 * Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.

The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.

Relevant KIP for read permissions can be found here:

[KIP-392|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]]


> Consider allowing all replicas to have read/write permissions
> -
>
> Key: KAFKA-8516
> URL: https://issues.apache.org/jira/browse/KAFKA-8516
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Yu
>Priority: Major
>
> Currently, in Kafka internals, a leader is responsible for all the read and 
> write operations requested by the user. This naturally incurs a bottleneck 
> since one replica, as the leader, would experience a significantly heavier 
> workload than other replicas and also means that all client commands must 
> pass through a chokepoint. If a leader fails, all processing effectively 
> comes to a halt until another leader election. In order to help solve this 
> problem, we could think about redesigning Kafka core so that any replica is 
> able to do read and write operations as well. That is, the system be changed 
> so that _all_ replicas have read/write permissions.
>   
>  This has multiple positives. Notably the following:
>  * Workload can be more evenly distributed since leader replicas are weighted 
> more than follower replicas (in this new design, all replicas are equal)
>  * Some failures would not be as catastrophic as in the leader-follower 
> paradigm. There is no one single "leader". If one replica goes down, others 
> are still able to read/write as needed. Processing could continue without 
> interruption.
> The implementation for such a change like this will be very extensive and 
> discussion would be needed to decide if such an improvement as described 
> above would warrant such a drastic redesign of Kafka internals.
> Relevant KIP for read permissions can be found here:
> [https://cwiki.

[jira] [Commented] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-10 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860350#comment-16860350
 ] 

Richard Yu commented on KAFKA-8516:
---

Thanks for the heads up! Will add the link to issue.

They are pretty close, but I think that KIP is only covering the addition of 
read permissions to replicas, not write permissions (i.e. fetches = reads). I 
think implementing read permissions for all replicas is considerably easier 
than write permissions (since we have to guarantee consistency). 

> Consider allowing all replicas to have read/write permissions
> -
>
> Key: KAFKA-8516
> URL: https://issues.apache.org/jira/browse/KAFKA-8516
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Yu
>Priority: Major
>
> Currently, in Kafka internals, a leader is responsible for all the read and 
> write operations requested by the user. This naturally incurs a bottleneck 
> since one replica, as the leader, would experience a significantly heavier 
> workload than other replicas and also means that all client commands must 
> pass through a chokepoint. If a leader fails, all processing effectively 
> comes to a halt until another leader election. In order to help solve this 
> problem, we could think about redesigning Kafka core so that any replica is 
> able to do read and write operations as well. That is, the system be changed 
> so that _all_ replicas have read/write permissions.
>   
>  This has multiple positives. Notably the following:
>  * Workload can be more evenly distributed since leader replicas are weighted 
> more than follower replicas (in this new design, all replicas are equal)
>  * Some failures would not be as catastrophic as in the leader-follower 
> paradigm. There is no one single "leader". If one replica goes down, others 
> are still able to read/write as needed. Processing could continue without 
> interruption.
> The implementation for such a change like this will be very extensive and 
> discussion would be needed to decide if such an improvement as described 
> above would warrant such a drastic redesign of Kafka internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-09 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8516:
--
Description: 
Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
  
 This has multiple positives. Notably the following:
 * Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all replicas are equal)
 * Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.

The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.

  was:
Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
  
 This has multiple positives. Notably the following:
 * Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all partitions are equal)
 * Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.

The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.


> Consider allowing all replicas to have read/write permissions
> -
>
> Key: KAFKA-8516
> URL: https://issues.apache.org/jira/browse/KAFKA-8516
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Yu
>Priority: Major
>
> Currently, in Kafka internals, a leader is responsible for all the read and 
> write operations requested by the user. This naturally incurs a bottleneck 
> since one replica, as the leader, would experience a significantly heavier 
> workload than other replicas and also means that all client commands must 
> pass through a chokepoint. If a leader fails, all processing effectively 
> comes to a halt until another leader election. In order to help solve this 
> problem, we could think about redesigning Kafka core so that any replica is 
> able to do read and write operations as well. That is, the system be changed 
> so that _all_ replicas have read/write permissions.
>   
>  This has multiple positives. Notably the following:
>  * Workload can be more evenly distributed since leader replicas are weighted 
> more than follower replicas (in this new design, all replicas are equal)
>  * Some failures would not be as catastrophic as in the leader-follower 
> paradigm. There is no one single "leader". If one replica goes down, others 
> are still able to read/write as needed. Processing could continue without 
> interruption.
> The implementation for such a change like this will be very extensive and 
> discussion would be needed to decide if such an improvement as described 
> above would warrant such a drastic redesign of Kafka internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-09 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8516:
--
Description: 
Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
  
 This has multiple positives. Notably the following:
 * Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all partitions are equal)
 * Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.

The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.

  was:
Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
 
This has multiple positives. Notably the following:
- Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all partitions are equal)
- Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.
 
The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.


> Consider allowing all replicas to have read/write permissions
> -
>
> Key: KAFKA-8516
> URL: https://issues.apache.org/jira/browse/KAFKA-8516
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Yu
>Priority: Major
>
> Currently, in Kafka internals, a leader is responsible for all the read and 
> write operations requested by the user. This naturally incurs a bottleneck 
> since one replica, as the leader, would experience a significantly heavier 
> workload than other replicas and also means that all client commands must 
> pass through a chokepoint. If a leader fails, all processing effectively 
> comes to a halt until another leader election. In order to help solve this 
> problem, we could think about redesigning Kafka core so that any replica is 
> able to do read and write operations as well. That is, the system be changed 
> so that _all_ replicas have read/write permissions.
>   
>  This has multiple positives. Notably the following:
>  * Workload can be more evenly distributed since leader replicas are weighted 
> more than follower replicas (in this new design, all partitions are equal)
>  * Some failures would not be as catastrophic as in the leader-follower 
> paradigm. There is no one single "leader". If one replica goes down, others 
> are still able to read/write as needed. Processing could continue without 
> interruption.
> The implementation for such a change like this will be very extensive and 
> discussion would be needed to decide if such an improvement as described 
> above would warrant such a drastic redesign of Kafka internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-09 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8516:
-

 Summary: Consider allowing all replicas to have read/write 
permissions
 Key: KAFKA-8516
 URL: https://issues.apache.org/jira/browse/KAFKA-8516
 Project: Kafka
  Issue Type: Improvement
Reporter: Richard Yu


Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
 
This has multiple positives. Notably the following:
- Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all partitions are equal)
- Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.
 
The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-06-05 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857147#comment-16857147
 ] 

Richard Yu commented on KAFKA-8421:
---

I think that we could extend this issue to account for a poll() operation which 
did not begin during a rebalance, but extends into it. Basically, we want to 
break the timeout up so that we can "join" the group, so that technically the 
consumer has received its new assignment, but will not receive it (in the form 
of the SyncGroup and JoinGroup response) until its fetch request is complete. 

Edit to previous comment: We could also halt any ongoing fetch requests should 
rebalance timeout hit its limit and return the data we already have. When every 
fetch request has been completed, we would send all JoinGroup responses 
afterwards (in otherwords, while a fetch request for previously owned 
partitions is still live, we will wait on sending any JoinGroup requests to 
avoid two consumers requesting for records from the same partition). 

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records they may not call the next poll() in time 
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer 
> dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-06-05 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088
 ] 

Richard Yu edited comment on KAFKA-8421 at 6/5/19 11:13 PM:


[~guozhang] There are a couple of simple approaches we could try to use to help 
prevent the consumer from dropping out of the group during rebalance:
 # We could artificially extend the rebalance timeout to accommodate the time 
spent in poll but considering that goes against the design described in KIP-62 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]),
 I don't think this is much of an option.
 # What we otherwise could attempt is an early termination of the record fetch 
request. In this scenario, we would cancel the request on broker side, but by 
the time the user could call poll() again, it is possible that the rebalance 
timeout has expired.

What we could do instead is the the following. The poll() operation which 
begins during rebalance would not block and wait the entire allocated time by 
the user, but instead, have this user timeout split into smaller chunks of 
time, in between which, the consumer could check for a possible rejoin.  (i.e. 
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is 
needed, we would send a JoinGroupRequest.

On broker side, we could modify the logic so that any pending fetch requests 
would be respected and be continued to completion. The JoinGroup response will 
be held off for a particular consumer until its fetch request has completed. In 
which case, we would then send the JoinGroupResponse (although it has been 
delayed). 

 I thought that this is a good way of avoiding breaking guarantees that Kafka 
had in place (particularly in regards to rebalance timeout). WDYT of this 
approach?


was (Author: yohan123):
[~guozhang] There are a couple of simple approaches we could try to use to help 
prevent the consumer from dropping out of the group during rebalance:
 # We could artificially extend the rebalance timeout to accommodate the time 
spent in poll but considering that goes against the design described in KIP-62 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]),
 I don't think this is much of an option.
 # What we otherwise could do instead is attempt an early termination of the 
record fetch request. In this scenario, we would cancel the request on broker 
side, but by the time the user could call poll() again, it is possible that the 
rebalance timeout has expired.

What we could do instead is the the following. The poll() operation which 
begins during rebalance would not block and wait the entire allocated time by 
the user, but instead, have this user timeout split into smaller chunks of 
time, in between which, the consumer could check for a possible rejoin.  (i.e. 
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is 
needed, we would send a JoinGroupRequest.

On broker side, we could modify the logic so that any pending fetch requests 
would be respected and be continued to completion. The JoinGroup response will 
be held off for a particular consumer until its fetch request has completed. In 
which case, we would then send the JoinGroupResponse (although it has been 
delayed). 

 I thought that this is a good way of avoiding breaking guarantees that Kafka 
had in place (particularly in regards to rebalance timeout). WDYT of this 
approach?

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records

[jira] [Comment Edited] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-06-05 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088
 ] 

Richard Yu edited comment on KAFKA-8421 at 6/5/19 9:32 PM:
---

[~guozhang] There are a couple of simple approaches we could try to use to help 
prevent the consumer from dropping out of the group during rebalance:
 # We could artificially extend the rebalance timeout to accommodate the time 
spent in poll but considering that goes against the design described in 
[KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]],
 I don't think this is much of an option.
 # What we otherwise could do instead is attempt an early termination of the 
record fetch request. In this scenario, we would cancel the request on broker 
side, but by the time the user could call poll() again, it is possible that the 
rebalance timeout has expired.

What we could do instead is the the following. The poll() operation which 
begins during rebalance would not block and wait the entire allocated time by 
the user, but instead, have this user timeout split into smaller chunks of 
time, in between which, the consumer could check for a possible rejoin.  (i.e. 
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is 
needed, we would send a JoinGroupRequest.

On broker side, we could modify the logic so that any pending fetch requests 
would be respected and be continued to completion. The JoinGroup response will 
be held off for a particular consumer until its fetch request has completed. In 
which case, we would then send the JoinGroupResponse (although it has been 
delayed). 

 I thought that this is a good way of avoiding breaking guarantees that Kafka 
had in place (particularly in regards to rebalance timeout). WDYT of this 
approach?


was (Author: yohan123):
[~guozhang] There are a couple of simple approaches we could try to use to help 
prevent the consumer from dropping out of the group during rebalance:
 # We could artificially extend the rebalance timeout to accommodate the time 
spent in poll but considering that goes against the design described in 
[KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]],
 I don't think this is much of an option.
 # What we otherwise could do instead is attempt an early termination of the 
record fetch request. In this scenario, we would cancel the request on broker 
side, but by the time the user could call poll() again, it is possible that the 
rebalance timeout has expired.

What we could do instead is the the following. The poll() operation which 
begins during rebalance would not block and wait the entire allocated time by 
the user, but instead, have this user timeout split into smaller chunks of 
time, in between which, the consumer could check for a possible rejoin.  (i.e. 
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is 
needed, we would send a JoinGroupRequest.

On broker side, we could modify the logic so that any pending fetch requests 
would be respected and be continued to completion. The JoinGroup response will 
be held off for a particular consumer until its fetch request has completed. In 
which case, we would then send the JoinGroupResponse (although it has been 
delayed). 

 I thought that this is a good way of avoiding breaking guarantees that Kafka 
had in place (particularly in regards to rebalance timeout). WDYT of this 
approach?

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing thos

[jira] [Comment Edited] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-06-05 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088
 ] 

Richard Yu edited comment on KAFKA-8421 at 6/5/19 9:32 PM:
---

[~guozhang] There are a couple of simple approaches we could try to use to help 
prevent the consumer from dropping out of the group during rebalance:
 # We could artificially extend the rebalance timeout to accommodate the time 
spent in poll but considering that goes against the design described in KIP-62 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]),
 I don't think this is much of an option.
 # What we otherwise could do instead is attempt an early termination of the 
record fetch request. In this scenario, we would cancel the request on broker 
side, but by the time the user could call poll() again, it is possible that the 
rebalance timeout has expired.

What we could do instead is the the following. The poll() operation which 
begins during rebalance would not block and wait the entire allocated time by 
the user, but instead, have this user timeout split into smaller chunks of 
time, in between which, the consumer could check for a possible rejoin.  (i.e. 
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is 
needed, we would send a JoinGroupRequest.

On broker side, we could modify the logic so that any pending fetch requests 
would be respected and be continued to completion. The JoinGroup response will 
be held off for a particular consumer until its fetch request has completed. In 
which case, we would then send the JoinGroupResponse (although it has been 
delayed). 

 I thought that this is a good way of avoiding breaking guarantees that Kafka 
had in place (particularly in regards to rebalance timeout). WDYT of this 
approach?


was (Author: yohan123):
[~guozhang] There are a couple of simple approaches we could try to use to help 
prevent the consumer from dropping out of the group during rebalance:
 # We could artificially extend the rebalance timeout to accommodate the time 
spent in poll but considering that goes against the design described in 
[KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]],
 I don't think this is much of an option.
 # What we otherwise could do instead is attempt an early termination of the 
record fetch request. In this scenario, we would cancel the request on broker 
side, but by the time the user could call poll() again, it is possible that the 
rebalance timeout has expired.

What we could do instead is the the following. The poll() operation which 
begins during rebalance would not block and wait the entire allocated time by 
the user, but instead, have this user timeout split into smaller chunks of 
time, in between which, the consumer could check for a possible rejoin.  (i.e. 
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is 
needed, we would send a JoinGroupRequest.

On broker side, we could modify the logic so that any pending fetch requests 
would be respected and be continued to completion. The JoinGroup response will 
be held off for a particular consumer until its fetch request has completed. In 
which case, we would then send the JoinGroupResponse (although it has been 
delayed). 

 I thought that this is a good way of avoiding breaking guarantees that Kafka 
had in place (particularly in regards to rebalance timeout). WDYT of this 
approach?

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing thos

[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-06-05 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857088#comment-16857088
 ] 

Richard Yu commented on KAFKA-8421:
---

[~guozhang] There are a couple of simple approaches we could try to use to help 
prevent the consumer from dropping out of the group during rebalance:
 # We could artificially extend the rebalance timeout to accommodate the time 
spent in poll but considering that goes against the design described in 
[KIP-62|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]],
 I don't think this is much of an option.
 # What we otherwise could do instead is attempt an early termination of the 
record fetch request. In this scenario, we would cancel the request on broker 
side, but by the time the user could call poll() again, it is possible that the 
rebalance timeout has expired.

What we could do instead is the the following. The poll() operation which 
begins during rebalance would not block and wait the entire allocated time by 
the user, but instead, have this user timeout split into smaller chunks of 
time, in between which, the consumer could check for a possible rejoin.  (i.e. 
we split a wait of 50 seconds into intervals of 5 seconds). If a rejoin is 
needed, we would send a JoinGroupRequest.

On broker side, we could modify the logic so that any pending fetch requests 
would be respected and be continued to completion. The JoinGroup response will 
be held off for a particular consumer until its fetch request has completed. In 
which case, we would then send the JoinGroupResponse (although it has been 
delayed). 

 I thought that this is a good way of avoiding breaking guarantees that Kafka 
had in place (particularly in regards to rebalance timeout). WDYT of this 
approach?

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records they may not call the next poll() in time 
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer 
> dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure

2019-06-05 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856999#comment-16856999
 ] 

Richard Yu commented on KAFKA-8438:
---

Sure [~bchen225242] will look into it and see what else I could think of.

> Add API to allow user to define end behavior of consumer failure
> 
>
> Key: KAFKA-8438
> URL: https://issues.apache.org/jira/browse/KAFKA-8438
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>  Labels: needs-dicussion, needs-kip
>
> Recently, in a concerted effort to make Kafka's rebalances less painful, 
> various approaches has been used to reduce the number of and impact of 
> rebalances. Often, the trigger of a rebalance is a failure of some sort or a 
> thrown exception during processing, in which case, the workload will be 
> redistributed among surviving threads. Working to reduce rebalances due to 
> random consumer crashes, a recent change to Kafka internals had been made 
> (which introduces the concept of static membership) that prevents a rebalance 
> from occurring within {{session.timeout.ms}} in the hope that the consumer 
> thread which crashed would recover in that time interval and rejoin the group.
> However, in some cases, some consumer threads would permanently go down or 
> remain dead for long periods of time. In these scenarios, users of Kafka 
> would possibly not be aware of such a crash until hours later after it 
> happened which forces Kafka users to manually start a new KafkaConsumer 
> process a considerable period of time after the failure had occurred. That is 
> where the addition of a callback such as {{onConsumerFailure}} would help. 
> There are multiple use cases for this callback (which is defined by the 
> user). {{onConsumerFailure}} is called when a particular consumer thread goes 
> under for some specified time interval (i.e. a config called 
> {{acceptable.consumer.failure.timeout.ms}}). When called, this method could 
> be used to log a consumer failure or should the user wish it, create a new 
> thread which would then rejoin the consumer group (which could also include 
> the required {{group.instance.id}} so that a rebalance wouldn't be 
> re-triggered –- we would need to think about that). 
> Should the old thread recover and attempt to rejoin the consumer group (with 
> the substitute thread being part of the group), the old thread will be denied 
> access and an exception would be thrown (to indicate that another process has 
> already taken its place).
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-05-31 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853099#comment-16853099
 ] 

Richard Yu commented on KAFKA-8421:
---

A key issue here is when we stop sending the old records of a consumer's 
assignment to the user. What we could consider doing is that, whenever we are 
in rebalance, we check the MemberState if its rebalancing first before sending. 
If the MemberState is no longer rebalancing, then we just abort any records we 
mean to send. 

onPartitionsRevoked/onPartitionsLost could also be be a viable way to abort an 
ongoing poll operation for old records. 

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records they may not call the next poll() in time 
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer 
> dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-05-30 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852643#comment-16852643
 ] 

Richard Yu commented on KAFKA-8421:
---

Might try to take a hack at this one. It certainly is an interesting issue. :)

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records they may not call the next poll() in time 
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer 
> dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2019-05-30 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852428#comment-16852428
 ] 

Richard Yu commented on KAFKA-3539:
---

[~radai] I don't know if this problem is resolved or not. Issue 6705 was closed 
because it was thought any changes made would be too complex to fix this 
behavioral issue.

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Oleg Zhurakousky
>Priority: Critical
>  Labels: needs-discussion, needs-kip
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >