Hello Adam,

Sorry for being late replying on this thread, I've put my comments inlined
below.

On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hey Folks
>
> I have a few questions around the operations of stateful processing while
> scaling nodes up/down, and a possible KIP in question #4. Most of them have
> to do with task processing during rebuilding of state stores after scaling
> nodes up.
>
> Scenario:
> Single node/thread, processing 2 topics (10 partitions each):
> User event topic (events) - ie: key:userId, value: ProductId
> Product topic (entity) - ie: key: ProductId, value: productData
>
> My topology looks like this:
>
> KTable productTable = ... //materialize from product topic
>
> KStream output = userStream
>     .map(x => (x.value, x.key) ) //Swap the key and value around
>     .join(productTable, ... ) //Joiner is not relevant here
>     .to(...)  //Send it to some output topic
>
>
> Here are my questions:
> 1) If I scale the processing node count up, partitions will be rebalanced
> to the new node. Does processing continue as normal on the original node,
> while the new node's processing is paused as the internal state stores are
> rebuilt/reloaded? From my reading of the code (and own experience) I
> believe this to be the case, but I am just curious in case I missed
> something.
>
>
With 2 topics and 10 partitions each, assuming the default PartitionGrouper
is used, there should be a total of 20 tasks (10 tasks for map which will
send to an internal repartition topic, and 10 tasks for doing the join)
created since these two topics are co-partitioned for joins.

For example, task-0 would be processing the join from
user-topic-partition-0 and product-topic-partition-0, and so on.

With a single thread, all of these 20 tasks will be allocated to this
thread, which would process them in an iterative manner. Note that since
each task has its own state store (e.g. product-state-store-0 for task-0,
etc), it means this thread will host all the 10 sets of state stores as
well (note for the 10 mapping tasks there's no state stores at all).

When you add new threads either within the same node, or on a different
node, after rebalance each thread should be processing 10 tasks, and hence
owning corresponding set of state stores due to rebalance. The new thread
will first restore the state stores it gets assigned before start
processing.


> 2) What happens to the userStream map task? Will the new node be able to
> process this task while the state store is rebuilding/reloading? My reading
> of the code suggests that this map process will be paused on the new node
> while the state store is rebuilt. The effect of this is that it will lead
> to a delay in events reaching the original node's partitions, which will be
> seen as late-arriving events. Am I right in this assessment?
>
>
Currently the thread will NOT start processing any tasks until ALL stateful
tasks completes restoring (stateless tasks, like the map tasks in your
example never needs restoration at all). There's an open JIRA for making it
customizable but I cannot find it currently.


> 3) How does scaling up work with standby state-store replicas? From my
> reading of the code, it appears that scaling a node up will result in a
> reabalance, with the state assigned to the new node being rebuilt first
> (leading to a pause in processing). Following this, the standy replicas are
> populated. Am I correct in this reading?
>
> Standby tasks are running in parallel with active stream tasks, and it
simply reads from the changelog topic in read time and populate the standby
store replica; when scaling out, the instances with standby tasks will be
preferred over those who do not have any standby for the task, and hence
when restoring only a very small amount of data needs to be restored
(think: the standby replica of the store is already populated up to offset
90 at the rebalance, while the active task is writing to the changelog
topic with log end offset 100, so you only need to restore 90 - 100 instead
of 0 - 100).


> 4) If my reading in #3 is correct, would it be possible to pre-populate the
> standby stores on scale-up before initiating active-task transfer? This
> would allow seamless scale-up and scale-down without requiring any pauses
> for rebuilding state. I am interested in kicking this off as a KIP if so,
> but would appreciate any JIRAs or related KIPs to read up on prior to
> digging into this.
>
> Yes, there is some discussions about this here:
https://issues.apache.org/jira/browse/KAFKA-6145


>
> Thanks
>
> Adam Bellemare
>


-- 
-- Guozhang

Reply via email to