Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-05 Thread John Roesler
Aha! I did miss that detail in your email before. Sorry for my density. It does seem like, if it turns out to be a problem, it would be pretty straightforward to add your proposal in. It wouldn't even require a version bump, because the wire protocol and the assignment algorithm would be mutually

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-05 Thread Guozhang Wang
Hello John, What I was thinking is that for other hosts of non-logged store only tasks, they will be ranked with a lag of "1" (as I tried to illustrate with "StatefulTasksToRankedCandidates[task][1] := instance" in the previous email), so that the current host would be ranked first, while others

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread John Roesler
I see; thanks for the clarification, Guozhang. If the prior owner of a non-logged, stateful task always reports 0, and all other instances report nothing, then it seems like we need a special case in the assignor to handle assigning these. I.e., the strategy of creating "movement" standbys and

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread Guozhang Wang
Hi John, I think I may mis-communicate my ideas a bit, what I meant is simply for case 1), let the active tasks reporting a lag of 0 instead of not reporting any lags at all; in addition, for other hosts not hosting this tasks, and hence we do not know the offset lag at all we set the

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread John Roesler
Thanks for the reply, Guozhang, I'll start by re-stating your cases, just to make sure we're on the same page... 1) The task is stateful, and all its stores are non-logged. For this case under the proposal, we would not have any standbys and the active version would actually not report any lag

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread Guozhang Wang
Hello John, Your reasoning about non-logged state store looks good to me. But I think it worth considering two cases differently: 1) a task's all state stores are non-logged, 2) a task has non-logged state store but also have other logged state store. For 1), we should not have any standbys for

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread Bruno Cadonna
Hi John, Thank you for your answer. Your assumptions sound reasonable to me. Best, Bruno On Wed, Sep 4, 2019 at 5:30 PM John Roesler wrote: > > Hey Bruno, > > Thanks for taking another look. Some quick responses: > > 1) It just means the number of offsets in the topic. E.g., the LSO is 100, >

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread John Roesler
Hey Bruno, Thanks for taking another look. Some quick responses: 1) It just means the number of offsets in the topic. E.g., the LSO is 100, but the first offset is 40 due to retention, so there are 60 offsets in the topic. Further, the lag on that topic would be considered to be 60 for any task

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread Bruno Cadonna
Hi, 1) What do you mean with "full set of offsets in the topic"? Is this the sum of all offsets of the changelog partitions of the task? 2) I am not sure whether non-logged stateful tasks should be effectively treated as stateless tasks during assignment. First we need to decide whether a

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-21 Thread John Roesler
Hi Guozhang, > My impression from your previous email is that inside the algorithm when we are "filling" them to instances some deterministic logic would be used to avoid the above case, is that correct? Yes, that was my plan, but I didn't formalize it. There was a requirement that the

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-21 Thread Guozhang Wang
Hello John, That sounds reasonable. Just double checked the code that with logging disabled the corresponding checkpoint file would not contain any values, just like a stateless task. So I think treating them logically the same is fine. Guozhang On Wed, Aug 21, 2019 at 11:41 AM John Roesler

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-21 Thread Guozhang Wang
Yes that makes sense to me. I was mainly curious to see how we would avoid threshing stateless tasks back-and-forth but can guarantee "convergence" since we do not require any stickiness. My impression from your previous email is that inside the algorithm when we are "filling" them to instances

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-21 Thread John Roesler
Hi again, Guozhang, While writing up the section on stateless tasks ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks), I reconsidered whether stateful, but non-logged, tasks should actually

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-21 Thread John Roesler
Thanks, Guozhang. (Side note: I noticed on another pass over the discussion that I'd missed addressing your comment about the potential race condition between state cleanup and lag-based assignment. I've added a solution to the proposal:

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-21 Thread Guozhang Wang
Hello John, I've made another pass on the wiki page again, overall LGTM. One meta comment about the "stateless" tasks: how do we represent them in the metadata? Are they just treated as stateful tasks with logging disabled, or are specially handled? It is not very clear in the description.

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-21 Thread Guozhang Wang
Hi John, Thanks for the added section, I agree with your reasoning and I think we can still use the standby replicas now. Guozhang On Tue, Aug 20, 2019 at 3:13 PM John Roesler wrote: > Hi All, > > Thanks for the discussion. I've been considering the idea of giving the > "catching up" tasks a

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-21 Thread John Roesler
I have also specifically called out that the assignment must achieve both "instance" and "task" balance: https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22 I've also addressed the problem

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-20 Thread John Roesler
In response to Bruno's concern #2, I've also added that section to the "Rejected Alternatives" section. Additionally, after reviewing some other assignment papers, I've developed the concern that specifying which "phases" the assignment algorithm should have, or indeed the logic of it at all,

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-20 Thread John Roesler
Hi All, Thanks for the discussion. I've been considering the idea of giving the "catching up" tasks a different name/role. I was in favor initially, but after working though some details, I think it causes some problems, which I've written up in the "rejected alternatives" part of the KIP:

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Guozhang Wang
I think I agree with you Sophie. My gut feeling is that 1) it should not be the major concern in assignor's algorithm for standby tasks not catching up, but rather be tackled in different modules, and 2) a lot of optimization can be down at the stream thread itself, like dedicated threading and

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Sophie Blee-Goldman
> we may have other ways to not starving the standby tasks, for example, by > using dedicate threads for standby tasks or even consider having *higher> priority for standby than active* so that we always try to caught up standby > first, then process active This is an interesting idea, but seems

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Sophie Blee-Goldman
Stateful tasks with logging disabled seem to be an interesting edge case. On the one hand, for balancing purposes they should be considered stateful since as Guozhang pointed out they are still "heavy" in IO costs. But for "catching up" purposes, ie when allocating standby tasks that will become

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Guozhang Wang
Regarding 3) above: I think for active task they should still be considered stateful since the processor would still pay IO cost accessing the store, but they would not have standby tasks? On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna wrote: > Hi, > > Thank you for the KIP! > > Some

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Bruno Cadonna
Hi, Thank you for the KIP! Some questions/comments: 1. I am wondering if the "stand-by" tasks that catch up state before the active task is switched deserve its own name in this KIP and maybe in the code. We have already stated that they are not true stand-by tasks, they are not configured

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Guozhang Wang
1. Sounds good, just wanted to clarify; and it may worth documenting it so that users would not be surprised when monitoring their footprint. 2. Hmm I see... I think the trade-off can be described as "how much imbalance would bother you to be willing to pay another rebalance, along with

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-07 Thread John Roesler
Hey Guozhang, Thanks for the review! 1. Yes, even with `num.standby.replicas := 0`, we will still temporarily allocate standby tasks to accomplish a no-downtime task migration. Although, I'd argue that this doesn't really violate the config, as the task isn't a true hot standby. As soon as it

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-06 Thread Guozhang Wang
Hello Sophie, Thanks for the proposed KIP. I left some comments on the wiki itself, and I think I'm still not very clear on a couple or those: 1. With this proposal, does that mean with num.standby.replicas == 0, we may sometimes still have some standby tasks which may violate the config? 2. I

[DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-06 Thread Sophie Blee-Goldman
Hey all, I'd like to kick off discussion on KIP-441, aimed at the long restore times in Streams during which further active processing and IQ are blocked. Please give it a read and let us know your thoughts