great, thanks again!
On Thu, Apr 14, 2016 at 12:07 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Greg, > > You can apply multiple operations on the same streams object in the Kafka > Streams DSL, for example: > > stream1 = builder.stream("topic1"); > > stream2 = stream1.map(/*set key*/).through("topic2").aggregateByKey(...); > > stream3 = stream1.map(/*a different > key*/).through("topci3").aggregateByKey(...); > > ----------------- > > Guozhang > > > On Thu, Apr 14, 2016 at 10:05 AM, Greg Fodor <gfo...@gmail.com> wrote: > >> Hi all, I'm working on a Kafka Streaming job and I've hit a case I >> didn't see straightforward support for. I have an input stream topic >> I'd like to perform two separate aggregations on, but building up two >> parallel sub-topologies with the same source topic at the source >> results in an error: "Topic <topic> has already been registered by >> another source." >> >> I was wondering if the right solution here is to duplicate the stream >> being read off from the topic (in-memory) and then performing the >> aggregations on the copies of the stream. The only method on KStream I >> found that seems to let me generate multiple streams from a single one >> is the branch() method, which takes in a list of predicates. So, the >> workaround here seems to be to perform a flatMap that will repeat the >> rows N times (injecting some index for each copy) and then >> de-interleaving the copies with a branch. This seems like it would >> work, but it will require a lot of cruft: creating the intermediate >> data structures, the branch predicate code, etc. >> >> It seems like a natural operation to be able to fan out a stream into >> multiple copies for downstream aggreation/joining. Is there an easy >> way to do this in the current API or is it something that would be a >> useful addition? >> > > > > -- > -- Guozhang