Eno,
Thanks for the response. The figure was just a restatement of my questions.
I have made an attempt at a low level processor and it appears to work but
it isn't very pretty and was hoping for something at the streams api level.

I have written some code to show an example of how I see the Cogroup
working in kafka.

First the KGroupedStream would have a cogroup method that takes the
initializer and the aggregator for that specific KGroupedStream. This would
return a KCogroupedStream that has 2 methods one to add more
KGroupedStream, Aggregator pairs and one to complete the construction and
return a KTable.

builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator,
aggValueSerde, storeName).cogroup(groupedStream1,
Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate();

Behind the scenes we create a KStreamAggregate for each KGroupedStream,
Aggregator pair. Then a final pass through processor to pass on the
aggregate values. This gives us a KTable backed by a single store that is
used in all of the processors.

Please let me know if this is something you think would add value to kafka
streams. And I will try to create a KIP to foster more communication.

You can take a look at what I have. I think it's missing a fair amount but
it's a good start. I took the doAggregate method in KGroupedStream as my
starting point and expanded on it for multiple streams:
https://github.com/KyleWinkelman/kafka/tree/cogroup

Reply via email to