Hi Greg, You can apply multiple operations on the same streams object in the Kafka Streams DSL, for example:
stream1 = builder.stream("topic1"); stream2 = stream1.map(/*set key*/).through("topic2").aggregateByKey(...); stream3 = stream1.map(/*a different key*/).through("topci3").aggregateByKey(...); ----------------- Guozhang On Thu, Apr 14, 2016 at 10:05 AM, Greg Fodor <gfo...@gmail.com> wrote: > Hi all, I'm working on a Kafka Streaming job and I've hit a case I > didn't see straightforward support for. I have an input stream topic > I'd like to perform two separate aggregations on, but building up two > parallel sub-topologies with the same source topic at the source > results in an error: "Topic <topic> has already been registered by > another source." > > I was wondering if the right solution here is to duplicate the stream > being read off from the topic (in-memory) and then performing the > aggregations on the copies of the stream. The only method on KStream I > found that seems to let me generate multiple streams from a single one > is the branch() method, which takes in a list of predicates. So, the > workaround here seems to be to perform a flatMap that will repeat the > rows N times (injecting some index for each copy) and then > de-interleaving the copies with a branch. This seems like it would > work, but it will require a lot of cruft: creating the intermediate > data structures, the branch predicate code, etc. > > It seems like a natural operation to be able to fan out a stream into > multiple copies for downstream aggreation/joining. Is there an easy > way to do this in the current API or is it something that would be a > useful addition? > -- -- Guozhang