Hi Guozhang -

Thanks for the replies, and directing me to the existing JIRAs. I think
that a two-phase rebalance will be quite useful.

1) For clarity's sake, I should have just asked: When a new thread / node
is created and tasks are rebalanced, are the state stores on the new
threads/nodes fully restored during rebalancing, thereby blocking *any and
all *threads from proceeding with processing until restoration is complete?
I do not believe that this is the case, and in the case of rebalanced tasks
only the threads assigned the new tasks will be paused until state store
restoration is complete.


Thanks for your help - I appreciate you taking the time to reply.

Adam



On Wed, Feb 20, 2019 at 8:38 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Adam,
>
> Sorry for being late replying on this thread, I've put my comments inlined
> below.
>
> On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
> > Hey Folks
> >
> > I have a few questions around the operations of stateful processing while
> > scaling nodes up/down, and a possible KIP in question #4. Most of them
> have
> > to do with task processing during rebuilding of state stores after
> scaling
> > nodes up.
> >
> > Scenario:
> > Single node/thread, processing 2 topics (10 partitions each):
> > User event topic (events) - ie: key:userId, value: ProductId
> > Product topic (entity) - ie: key: ProductId, value: productData
> >
> > My topology looks like this:
> >
> > KTable productTable = ... //materialize from product topic
> >
> > KStream output = userStream
> >     .map(x => (x.value, x.key) ) //Swap the key and value around
> >     .join(productTable, ... ) //Joiner is not relevant here
> >     .to(...)  //Send it to some output topic
> >
> >
> > Here are my questions:
> > 1) If I scale the processing node count up, partitions will be rebalanced
> > to the new node. Does processing continue as normal on the original node,
> > while the new node's processing is paused as the internal state stores
> are
> > rebuilt/reloaded? From my reading of the code (and own experience) I
> > believe this to be the case, but I am just curious in case I missed
> > something.
> >
> >
> With 2 topics and 10 partitions each, assuming the default PartitionGrouper
> is used, there should be a total of 20 tasks (10 tasks for map which will
> send to an internal repartition topic, and 10 tasks for doing the join)
> created since these two topics are co-partitioned for joins.
>
> For example, task-0 would be processing the join from
> user-topic-partition-0 and product-topic-partition-0, and so on.
>
> With a single thread, all of these 20 tasks will be allocated to this
> thread, which would process them in an iterative manner. Note that since
> each task has its own state store (e.g. product-state-store-0 for task-0,
> etc), it means this thread will host all the 10 sets of state stores as
> well (note for the 10 mapping tasks there's no state stores at all).
>
> When you add new threads either within the same node, or on a different
> node, after rebalance each thread should be processing 10 tasks, and hence
> owning corresponding set of state stores due to rebalance. The new thread
> will first restore the state stores it gets assigned before start
> processing.
>
>
> > 2) What happens to the userStream map task? Will the new node be able to
> > process this task while the state store is rebuilding/reloading? My
> reading
> > of the code suggests that this map process will be paused on the new node
> > while the state store is rebuilt. The effect of this is that it will lead
> > to a delay in events reaching the original node's partitions, which will
> be
> > seen as late-arriving events. Am I right in this assessment?
> >
> >
> Currently the thread will NOT start processing any tasks until ALL stateful
> tasks completes restoring (stateless tasks, like the map tasks in your
> example never needs restoration at all). There's an open JIRA for making it
> customizable but I cannot find it currently.
>
>
> > 3) How does scaling up work with standby state-store replicas? From my
> > reading of the code, it appears that scaling a node up will result in a
> > reabalance, with the state assigned to the new node being rebuilt first
> > (leading to a pause in processing). Following this, the standy replicas
> are
> > populated. Am I correct in this reading?
> >
> > Standby tasks are running in parallel with active stream tasks, and it
> simply reads from the changelog topic in read time and populate the standby
> store replica; when scaling out, the instances with standby tasks will be
> preferred over those who do not have any standby for the task, and hence
> when restoring only a very small amount of data needs to be restored
> (think: the standby replica of the store is already populated up to offset
> 90 at the rebalance, while the active task is writing to the changelog
> topic with log end offset 100, so you only need to restore 90 - 100 instead
> of 0 - 100).
>
>
> > 4) If my reading in #3 is correct, would it be possible to pre-populate
> the
> > standby stores on scale-up before initiating active-task transfer? This
> > would allow seamless scale-up and scale-down without requiring any pauses
> > for rebuilding state. I am interested in kicking this off as a KIP if so,
> > but would appreciate any JIRAs or related KIPs to read up on prior to
> > digging into this.
> >
> > Yes, there is some discussions about this here:
> https://issues.apache.org/jira/browse/KAFKA-6145
>
>
> >
> > Thanks
> >
> > Adam Bellemare
> >
>
>
> --
> -- Guozhang
>

Reply via email to