great, thanks again!

On Thu, Apr 14, 2016 at 12:07 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Hi Greg,
>
> You can apply multiple operations on the same streams object in the Kafka
> Streams DSL, for example:
>
> stream1 = builder.stream("topic1");
>
> stream2 = stream1.map(/*set key*/).through("topic2").aggregateByKey(...);
>
> stream3 = stream1.map(/*a different
> key*/).through("topci3").aggregateByKey(...);
>
> -----------------
>
> Guozhang
>
>
> On Thu, Apr 14, 2016 at 10:05 AM, Greg Fodor <gfo...@gmail.com> wrote:
>
>> Hi all, I'm working on a Kafka Streaming job and I've hit a case I
>> didn't see straightforward support for. I have an input stream topic
>> I'd like to perform two separate aggregations on, but building up two
>> parallel sub-topologies with the same source topic at the source
>> results in an error: "Topic <topic> has already been registered by
>> another source."
>>
>> I was wondering if the right solution here is to duplicate the stream
>> being read off from the topic (in-memory) and then performing the
>> aggregations on the copies of the stream. The only method on KStream I
>> found that seems to let me generate multiple streams from a single one
>> is the branch() method, which takes in a list of predicates. So, the
>> workaround here seems to be to perform a flatMap that will repeat the
>> rows N times (injecting some index for each copy) and then
>> de-interleaving the copies with a branch. This seems like it would
>> work, but it will require a lot of cruft: creating the intermediate
>> data structures, the branch predicate code, etc.
>>
>> It seems like a natural operation to be able to fan out a stream into
>> multiple copies for downstream aggreation/joining. Is there an easy
>> way to do this in the current API or is it something that would be a
>> useful addition?
>>
>
>
>
> --
> -- Guozhang

Reply via email to