Hello Srikanth,

Thanks for your questions, please see replies inlined.


On Tue, May 17, 2016 at 7:36 PM, Srikanth <srikanth...@gmail.com> wrote:

> Hi,
>
> I was reading about Kafka streams and trying to understand its programming
> model.
> Some observations that I wanted to get some clarity on..
>
> 1) Joins & aggregations use an internal topic for shuffle. Source
> processors will write to this topic with the key used for join. Then it is
> free to commit offset for that run.
> Does that mean we have to rely on internal topic's replication to guarantee
> at-least-once processing?
>
> That is right, and users can set the config of "replication.factor" in
StreamsConfig to configure the replication though.



>   Won't these replicated internal topics put too much load on brokers? We
> have to scale up brokers whenever our stream processing needs increase.
> Its easy to add/remove processing nodes to run kstream app instance but
> adding/removing brokers is not that straight forward.
>
> This is a good question, we are working on making recommendations about
how to easily re-process (possibly after scaling partitions) as we
mentioned in Kafka Streams, and Matthias (cc'ed) may be able to come back
to you.


>
> 2) How may partitions are created for the internal topic? What is the
> retention policy?
> When/how is it created and cleaned up when streaming application is
> shutdown permanently?
> Any link with deep dive into this will be helpful.
>
> The number of partitions is currently set to the number of tasks writing
to those topics with default retention policy, which then in turn
determined by the source topic, and it is created upon task initialization.
For example, if you have a topology as:

source-topic -> sourceNode -> processNode -> sinkNode -> internal-topic ->
sourceNode -> ...

Then the internal-topic's number of partitions is equal to the number of
partitions of source-topic.

We are still working on cleaning up such topics if users do not want to
re-process any more, there is a JIRA open for this:
https://issues.apache.org/jira/browse/KAFKA-3185


>
>  3) I'm a bit confused on the threading model. Lets take the below example.
> Assume "PageViews" and "UserProfile" have four partitions each.
> I start two instance of this app both with two threads. So we have four
> threads all together.
> My understanding is that each thread will now receive records from one
> partition in both topics. After reading they'll do the map, filter, etc and
> write to internal topic for join.
> Then the threads go on to read the next set of records from previous
> offset. I guess this can be viewed as some sort of chaining with in a
> stage.
>
>   Now where does the thread that reads from internal topic run? Do they
> share the same set of threads?
>

Sub-topologies that are "cut" by the internal topics are considered not
connected, and are assigned to different tasks.

So again back to this example:

[ source-topic -> sourceNode -> processNode -> sinkNode -> ]  [
internal-topic -> sourceNode -> ... ]

It is translated as two sub-topologies, with four partitions of the
source-topic, it will get 8 tasks in total, 4 of them for the first
sub-topology and the other 4 of them for the second sub-topology; but
multiple tasks can be hosted on the same thread, so if you have 4 threads
in total each one will host 2 tasks, and the assignment is done by the
library to achieve shorter restoration process.

Can we increase the parallelism for just this processor? If I know that the
> join().map().process() chain is more compute intensive.
>
>   KStream<String, GenericRecord> views = builder.stream("PageViews")
>       .map(...)
> .filter(...)
>
>   KTable<String, GenericRecord> users = builder.table("UserProfile")
> .mapValues(...)
>
>   KStream<String, Long> regionCount = views
>                .leftJoin(user, ...)
> .map(...)
> .process(...)
>
>
If you want to just increase parallelism of part of the topology, you can
use a "through()" call with an intermediate topic created by yourself which
effectively cut the topology into dis-connected sub-topologies, and by
controlling the number of partitions of this intermediate topic you can
control parallelism of different sub-topologies.


> I hope I was able to explain my questions clear enough for you to
> understand.
>
> Thanks,
> Srikanth
>



-- 
-- Guozhang

Reply via email to