Hi Greg,

You can apply multiple operations on the same streams object in the Kafka
Streams DSL, for example:

stream1 = builder.stream("topic1");

stream2 = stream1.map(/*set key*/).through("topic2").aggregateByKey(...);

stream3 = stream1.map(/*a different
key*/).through("topci3").aggregateByKey(...);

-----------------

Guozhang


On Thu, Apr 14, 2016 at 10:05 AM, Greg Fodor <gfo...@gmail.com> wrote:

> Hi all, I'm working on a Kafka Streaming job and I've hit a case I
> didn't see straightforward support for. I have an input stream topic
> I'd like to perform two separate aggregations on, but building up two
> parallel sub-topologies with the same source topic at the source
> results in an error: "Topic <topic> has already been registered by
> another source."
>
> I was wondering if the right solution here is to duplicate the stream
> being read off from the topic (in-memory) and then performing the
> aggregations on the copies of the stream. The only method on KStream I
> found that seems to let me generate multiple streams from a single one
> is the branch() method, which takes in a list of predicates. So, the
> workaround here seems to be to perform a flatMap that will repeat the
> rows N times (injecting some index for each copy) and then
> de-interleaving the copies with a branch. This seems like it would
> work, but it will require a lot of cruft: creating the intermediate
> data structures, the branch predicate code, etc.
>
> It seems like a natural operation to be able to fan out a stream into
> multiple copies for downstream aggreation/joining. Is there an easy
> way to do this in the current API or is it something that would be a
> useful addition?
>



-- 
-- Guozhang

Reply via email to