Eno, Thanks for the response. The figure was just a restatement of my questions. I have made an attempt at a low level processor and it appears to work but it isn't very pretty and was hoping for something at the streams api level.
I have written some code to show an example of how I see the Cogroup working in kafka. First the KGroupedStream would have a cogroup method that takes the initializer and the aggregator for that specific KGroupedStream. This would return a KCogroupedStream that has 2 methods one to add more KGroupedStream, Aggregator pairs and one to complete the construction and return a KTable. builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator, aggValueSerde, storeName).cogroup(groupedStream1, Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate(); Behind the scenes we create a KStreamAggregate for each KGroupedStream, Aggregator pair. Then a final pass through processor to pass on the aggregate values. This gives us a KTable backed by a single store that is used in all of the processors. Please let me know if this is something you think would add value to kafka streams. And I will try to create a KIP to foster more communication. You can take a look at what I have. I think it's missing a fair amount but it's a good start. I took the doAggregate method in KGroupedStream as my starting point and expanded on it for multiple streams: https://github.com/KyleWinkelman/kafka/tree/cogroup