Ok, I'll try and map my problem into something that should be familiar to most people.
Consider collection of PCs, each of which has a unique ID, e.g. ca:fe:ba:be, de:ad:be:ef, etc. Each PC has a tree of local files. Some of the file paths are coincidentally the same names, but there is no file sharing between PCs. I need to produce metrics about how often files are opened and how long they are open for. I need for every X minute tumbling window not just the cumulative averages for each PC, but the averages for each file as well as the cumulative averegaes for each folder and their sub-folders. I have a stream of events like {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"} {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin guide.txt","duration":"196"} {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"} {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"} So from that I would like to know stuff like: ca:fe:ba:be had 4/X opens per minute in the X minute window ca:fe:ba:be had 3/X closes per minute in the X minute window and the average time open was (67+97+197)/3=120... there is no guarantee that the closes will be matched with opens in the same window, which is why I'm only tracking them separately de:ad:be:ef had 2/X opens per minute in the X minute window ca:fe:ba:be /foo had 4/X opens per minute in the X minute window ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the average time open was 120 de:ad:be:ef /foo had 1/X opens per minute in the X minute window de:ad:be:ef /bar had 1/X opens per minute in the X minute window de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute window de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute window etc What I think I want to do is turn each event into a series of events with different keys, so that {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"} gets sent under the keys: ("ca:fe:ba:be","/") ("ca:fe:ba:be","/foo") ("ca:fe:ba:be","/foo/bar") ("ca:fe:ba:be","/foo/bar/README.txt") Then I could use a window aggregation function to just: * count the "open" events * count the "close" events and sum their duration Additionally, I am (naïevely) hoping that if a window has no events for a particular key, the memory/storage costs are zero for that key. >From what I can see, to achieve what I am trying to do, I could use a flatMap followed by a keyBy In other words I take the events and flat map them based on the path split on '/' returning a Tuple of the (to be) key and the event. Then I can use keyBy to key based on the Tuple 0. My ask: Is the above design a good design? How would you achieve the end game better? Do I need to worry about many paths that are accessed rarely and would have an accumulator function that stays at 0 unless there are events in that window... or are the accumulators for each distinct key eagerly purged after each fire trigger. What gotcha's do I need to look for. Thanks in advance and appologies for the length -stephenc