Re: Reduce one event under multiple keys

2019-02-21 Thread Stephen Connolly
Thanks!

On Mon, 18 Feb 2019 at 12:36, Fabian Hueske  wrote:

> Hi Stephen,
>
> Sorry for the late response.
> If you don't need to match open and close events, your approach of using a
> flatMap to fan-out for the hierarchical folder structure and a window
> operator (or two for open and close) for counting and aggregating should be
> a good design.
>
> Best, Fabian
>
> Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>>
>>
>> On Mon, 11 Feb 2019 at 09:42, Fabian Hueske  wrote:
>>
>>> Hi Stephen,
>>>
>>> A window is created with the first record that is assigned to it.
>>> If the windows are based on time and a key, than no window will be
>>> created (and not space be occupied) if there is not a first record for a
>>> key and time interval.
>>>
>>> Anyway, if tracking the number of open files & average opening time is
>>> your use case, you might want to implement the logic with a ProcessFunction
>>> instead of a window.
>>> The reason is that it is that time windows don't share state, i.e., the
>>> information about an opened but not yet closed file would not be "carried
>>> over" to the next window.
>>> However, if you use a ProcessFunction, you are responsible for cleaning
>>> up the state.
>>>
>>
>> Ahh but I am cheating by ensuring the events are rich enough that I do
>> not need to match them.
>>
>> I get the "open" (they are not really "open" events - I have mapped to an
>> analogy... it might be more like a build job start events... or not... I'm
>> not at liberty to say ;-) ) events because I need to count the number of
>> "open"s per time period.
>>
>> I get the "close" events and they include the duration plus other
>> information that can then be transformed into the required metrics... yes I
>> could derive the "open" from the "close" by subtracting the duration but:
>>
>> 1. they would cross window boundaries quite often, leading to repeated
>> fetch-update-write operations on the backing data store
>> 2. they wouldn't be as "live" and one of the things we need to know is
>> how many "open"s there are in the previous window... given some durations
>> can be many days, waiting for the "close" event to create the "open" metric
>> would not be a good plan.
>>
>> Basically, I am pushing some of the calculations to the edge where there
>> is state that makes those calculations cheap and then the rich events are
>> *hopefully* easy to aggregate with just simple aggregation functions that
>> only need to maintain the running total... at least that's what the PoC I
>> am experimenting with Flink should show
>>
>>
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
>>> stephen.alan.conno...@gmail.com>:
>>>


 On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler 
 wrote:

> This sounds reasonable to me.
>
> I'm a bit confused by this question: "*Additionally, I am (naïevely)
> hoping that if a window has no events for a particular key, the
> memory/storage costs are zero for that key.*"
>
> Are you asking whether a key that was received in window X (as part of
> an event) is still present in window x+1? If so, then the answer is no; a
> key will only be present in a given window if an event was received that
> fits into that window.
>

 To confirm:

 So let's say I'l tracking the average time a file is opened in folders.

 In window N we get the events:

 {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

 {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
 {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
 guide.txt"}
 {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
 guide.txt"}

 So there will be aggregates stored for
 ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
 ("ca:fe:ba:be","/foo/bar/README.txt"), etc

 In window N+1 we do not get any events at all.

 So the memory used by my aggregation functions from window N will be
 freed and the storage will be effectively zero (modulo any follow on
 processing that might be on a longer window)

 This seems to be what you are saying... in which case my naïeve hope
 was not so naïve! w00t!


>
> On 08.02.2019 13:21, Stephen Connolly wrote:
>
> Ok, I'll try and map my problem into something that should be familiar
> to most people.
>
> Consider collection of PCs, each of which has a unique ID, e.g.
> ca:fe:ba:be, de:ad:be:ef, etc.
>
> Each PC has a tree of local files. Some of the file paths are
> coincidentally the same names, but there is no file sharing between PCs.
>
> I need to produce metrics about how often files are opened and how
> long they are open for.
>
> I need for every X minute tumbling window not

Re: Reduce one event under multiple keys

2019-02-18 Thread Fabian Hueske
Hi Stephen,

Sorry for the late response.
If you don't need to match open and close events, your approach of using a
flatMap to fan-out for the hierarchical folder structure and a window
operator (or two for open and close) for counting and aggregating should be
a good design.

Best, Fabian

Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <
stephen.alan.conno...@gmail.com>:

>
>
> On Mon, 11 Feb 2019 at 09:42, Fabian Hueske  wrote:
>
>> Hi Stephen,
>>
>> A window is created with the first record that is assigned to it.
>> If the windows are based on time and a key, than no window will be
>> created (and not space be occupied) if there is not a first record for a
>> key and time interval.
>>
>> Anyway, if tracking the number of open files & average opening time is
>> your use case, you might want to implement the logic with a ProcessFunction
>> instead of a window.
>> The reason is that it is that time windows don't share state, i.e., the
>> information about an opened but not yet closed file would not be "carried
>> over" to the next window.
>> However, if you use a ProcessFunction, you are responsible for cleaning
>> up the state.
>>
>
> Ahh but I am cheating by ensuring the events are rich enough that I do not
> need to match them.
>
> I get the "open" (they are not really "open" events - I have mapped to an
> analogy... it might be more like a build job start events... or not... I'm
> not at liberty to say ;-) ) events because I need to count the number of
> "open"s per time period.
>
> I get the "close" events and they include the duration plus other
> information that can then be transformed into the required metrics... yes I
> could derive the "open" from the "close" by subtracting the duration but:
>
> 1. they would cross window boundaries quite often, leading to repeated
> fetch-update-write operations on the backing data store
> 2. they wouldn't be as "live" and one of the things we need to know is how
> many "open"s there are in the previous window... given some durations can
> be many days, waiting for the "close" event to create the "open" metric
> would not be a good plan.
>
> Basically, I am pushing some of the calculations to the edge where there
> is state that makes those calculations cheap and then the rich events are
> *hopefully* easy to aggregate with just simple aggregation functions that
> only need to maintain the running total... at least that's what the PoC I
> am experimenting with Flink should show
>
>
>>
>> Hope this helps,
>> Fabian
>>
>> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
>> stephen.alan.conno...@gmail.com>:
>>
>>>
>>>
>>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler 
>>> wrote:
>>>
 This sounds reasonable to me.

 I'm a bit confused by this question: "*Additionally, I am (naïevely)
 hoping that if a window has no events for a particular key, the
 memory/storage costs are zero for that key.*"

 Are you asking whether a key that was received in window X (as part of
 an event) is still present in window x+1? If so, then the answer is no; a
 key will only be present in a given window if an event was received that
 fits into that window.

>>>
>>> To confirm:
>>>
>>> So let's say I'l tracking the average time a file is opened in folders.
>>>
>>> In window N we get the events:
>>>
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>
>>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>> guide.txt"}
>>>
>>> So there will be aggregates stored for
>>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>>
>>> In window N+1 we do not get any events at all.
>>>
>>> So the memory used by my aggregation functions from window N will be
>>> freed and the storage will be effectively zero (modulo any follow on
>>> processing that might be on a longer window)
>>>
>>> This seems to be what you are saying... in which case my naïeve hope was
>>> not so naïve! w00t!
>>>
>>>

 On 08.02.2019 13:21, Stephen Connolly wrote:

 Ok, I'll try and map my problem into something that should be familiar
 to most people.

 Consider collection of PCs, each of which has a unique ID, e.g.
 ca:fe:ba:be, de:ad:be:ef, etc.

 Each PC has a tree of local files. Some of the file paths are
 coincidentally the same names, but there is no file sharing between PCs.

 I need to produce metrics about how often files are opened and how long
 they are open for.

 I need for every X minute tumbling window not just the cumulative
 averages for each PC, but the averages for each file as well as the
 cumulative averegaes for each folder and their sub-folders.

 I have a stream of events like
>>>

Re: Reduce one event under multiple keys

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 09:42, Fabian Hueske  wrote:

> Hi Stephen,
>
> A window is created with the first record that is assigned to it.
> If the windows are based on time and a key, than no window will be created
> (and not space be occupied) if there is not a first record for a key and
> time interval.
>
> Anyway, if tracking the number of open files & average opening time is
> your use case, you might want to implement the logic with a ProcessFunction
> instead of a window.
> The reason is that it is that time windows don't share state, i.e., the
> information about an opened but not yet closed file would not be "carried
> over" to the next window.
> However, if you use a ProcessFunction, you are responsible for cleaning up
> the state.
>

Ahh but I am cheating by ensuring the events are rich enough that I do not
need to match them.

I get the "open" (they are not really "open" events - I have mapped to an
analogy... it might be more like a build job start events... or not... I'm
not at liberty to say ;-) ) events because I need to count the number of
"open"s per time period.

I get the "close" events and they include the duration plus other
information that can then be transformed into the required metrics... yes I
could derive the "open" from the "close" by subtracting the duration but:

1. they would cross window boundaries quite often, leading to repeated
fetch-update-write operations on the backing data store
2. they wouldn't be as "live" and one of the things we need to know is how
many "open"s there are in the previous window... given some durations can
be many days, waiting for the "close" event to create the "open" metric
would not be a good plan.

Basically, I am pushing some of the calculations to the edge where there is
state that makes those calculations cheap and then the rich events are
*hopefully* easy to aggregate with just simple aggregation functions that
only need to maintain the running total... at least that's what the PoC I
am experimenting with Flink should show


>
> Hope this helps,
> Fabian
>
> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>>
>>
>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler 
>> wrote:
>>
>>> This sounds reasonable to me.
>>>
>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>> hoping that if a window has no events for a particular key, the
>>> memory/storage costs are zero for that key.*"
>>>
>>> Are you asking whether a key that was received in window X (as part of
>>> an event) is still present in window x+1? If so, then the answer is no; a
>>> key will only be present in a given window if an event was received that
>>> fits into that window.
>>>
>>
>> To confirm:
>>
>> So let's say I'l tracking the average time a file is opened in folders.
>>
>> In window N we get the events:
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>
>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}
>>
>> So there will be aggregates stored for
>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>
>> In window N+1 we do not get any events at all.
>>
>> So the memory used by my aggregation functions from window N will be
>> freed and the storage will be effectively zero (modulo any follow on
>> processing that might be on a longer window)
>>
>> This seems to be what you are saying... in which case my naïeve hope was
>> not so naïve! w00t!
>>
>>
>>>
>>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>>
>>> Ok, I'll try and map my problem into something that should be familiar
>>> to most people.
>>>
>>> Consider collection of PCs, each of which has a unique ID, e.g.
>>> ca:fe:ba:be, de:ad:be:ef, etc.
>>>
>>> Each PC has a tree of local files. Some of the file paths are
>>> coincidentally the same names, but there is no file sharing between PCs.
>>>
>>> I need to produce metrics about how often files are opened and how long
>>> they are open for.
>>>
>>> I need for every X minute tumbling window not just the cumulative
>>> averages for each PC, but the averages for each file as well as the
>>> cumulative averegaes for each folder and their sub-folders.
>>>
>>> I have a stream of events like
>>>
>>>
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","pa

Re: Reduce one event under multiple keys

2019-02-11 Thread Fabian Hueske
Hi Stephen,

A window is created with the first record that is assigned to it.
If the windows are based on time and a key, than no window will be created
(and not space be occupied) if there is not a first record for a key and
time interval.

Anyway, if tracking the number of open files & average opening time is your
use case, you might want to implement the logic with a ProcessFunction
instead of a window.
The reason is that it is that time windows don't share state, i.e., the
information about an opened but not yet closed file would not be "carried
over" to the next window.
However, if you use a ProcessFunction, you are responsible for cleaning up
the state.

Hope this helps,
Fabian

Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
stephen.alan.conno...@gmail.com>:

>
>
> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler  wrote:
>
>> This sounds reasonable to me.
>>
>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>> hoping that if a window has no events for a particular key, the
>> memory/storage costs are zero for that key.*"
>>
>> Are you asking whether a key that was received in window X (as part of an
>> event) is still present in window x+1? If so, then the answer is no; a key
>> will only be present in a given window if an event was received that fits
>> into that window.
>>
>
> To confirm:
>
> So let's say I'l tracking the average time a file is opened in folders.
>
> In window N we get the events:
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>
> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}
>
> So there will be aggregates stored for
> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>
> In window N+1 we do not get any events at all.
>
> So the memory used by my aggregation functions from window N will be freed
> and the storage will be effectively zero (modulo any follow on processing
> that might be on a longer window)
>
> This seems to be what you are saying... in which case my naïeve hope was
> not so naïve! w00t!
>
>
>>
>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>
>> Ok, I'll try and map my problem into something that should be familiar to
>> most people.
>>
>> Consider collection of PCs, each of which has a unique ID, e.g.
>> ca:fe:ba:be, de:ad:be:ef, etc.
>>
>> Each PC has a tree of local files. Some of the file paths are
>> coincidentally the same names, but there is no file sharing between PCs.
>>
>> I need to produce metrics about how often files are opened and how long
>> they are open for.
>>
>> I need for every X minute tumbling window not just the cumulative
>> averages for each PC, but the averages for each file as well as the
>> cumulative averegaes for each folder and their sub-folders.
>>
>> I have a stream of events like
>>
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
>> guide.txt","duration":"196"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>>
>> So from that I would like to know stuff like:
>>
>> ca:fe:ba:be had 4/X opens per minute in the X minute window
>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
>> average time open was (67+97+197)/3=120... there is no guarantee that the
>> closes will be matched with opens in the same window, which is why I'm only
>> tracking them separately
>> de:ad:be:ef had 2/X opens per minute in the X minute window
>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
>> average time open was 120
>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
>> minute window
>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
>> window
>> etc
>>
>> What I think I want to do is turn each event into a series of events with
>> different keys, so that
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/f

Re: Reduce one event under multiple keys

2019-02-10 Thread Stephen Connolly
On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler  wrote:

> This sounds reasonable to me.
>
> I'm a bit confused by this question: "*Additionally, I am (naïevely)
> hoping that if a window has no events for a particular key, the
> memory/storage costs are zero for that key.*"
>
> Are you asking whether a key that was received in window X (as part of an
> event) is still present in window x+1? If so, then the answer is no; a key
> will only be present in a given window if an event was received that fits
> into that window.
>

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for
("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed
and the storage will be effectively zero (modulo any follow on processing
that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was
not so naïve! w00t!


>
> On 08.02.2019 13:21, Stephen Connolly wrote:
>
> Ok, I'll try and map my problem into something that should be familiar to
> most people.
>
> Consider collection of PCs, each of which has a unique ID, e.g.
> ca:fe:ba:be, de:ad:be:ef, etc.
>
> Each PC has a tree of local files. Some of the file paths are
> coincidentally the same names, but there is no file sharing between PCs.
>
> I need to produce metrics about how often files are opened and how long
> they are open for.
>
> I need for every X minute tumbling window not just the cumulative averages
> for each PC, but the averages for each file as well as the cumulative
> averegaes for each folder and their sub-folders.
>
> I have a stream of events like
>
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
> guide.txt","duration":"196"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>
> So from that I would like to know stuff like:
>
> ca:fe:ba:be had 4/X opens per minute in the X minute window
> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
> average time open was (67+97+197)/3=120... there is no guarantee that the
> closes will be matched with opens in the same window, which is why I'm only
> tracking them separately
> de:ad:be:ef had 2/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
> average time open was 120
> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
> minute window
> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
> window
> etc
>
> What I think I want to do is turn each event into a series of events with
> different keys, so that
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>
> gets sent under the keys:
>
> ("ca:fe:ba:be","/")
> ("ca:fe:ba:be","/foo")
> ("ca:fe:ba:be","/foo/bar")
> ("ca:fe:ba:be","/foo/bar/README.txt")
>
> Then I could use a window aggregation function to just:
>
> * count the "open" events
> * count the "close" events and sum their duration
>
> Additionally, I am (naïevely) hoping that if a window has no events for a
> particular key, the memory/storage costs are zero for that key.
>
> From what I can see, to achieve what I am trying to do, I could use a
> flatMap followed by a keyBy
>
> In other words I take the events and flat map them based on the path split
> on '/' returning a Tuple of the (to be) key and the event. Then I can use
> keyBy to key based on the Tuple 0.
>
> My ask:
>
> Is the above design a good design? How would you achieve the end game
> better? Do I need to worry about many paths that are accessed rarely and

Re: Reduce one event under multiple keys

2019-02-10 Thread Chesnay Schepler

This sounds reasonable to me.

I'm a bit confused by this question: "/Additionally, I am (naïevely) 
hoping that if a window has no events for a particular key, the 
memory/storage costs are zero for that key./"


Are you asking whether a key that was received in window X (as part of 
an event) is still present in window x+1? If so, then the answer is no; 
a key will only be present in a given window if an event was received 
that fits into that window.


On 08.02.2019 13:21, Stephen Connolly wrote:
Ok, I'll try and map my problem into something that should be familiar 
to most people.


Consider collection of PCs, each of which has a unique ID, e.g. 
ca:fe:ba:be, de:ad:be:ef, etc.


Each PC has a tree of local files. Some of the file paths are 
coincidentally the same names, but there is no file sharing between PCs.


I need to produce metrics about how often files are opened and how 
long they are open for.


I need for every X minute tumbling window not just the cumulative 
averages for each PC, but the averages for each file as well as the 
cumulative averegaes for each folder and their sub-folders.


I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User 
guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin 
guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User 
guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin 
guide.txt","duration":"196"}

{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the 
average time open was (67+97+197)/3=120... there is no guarantee that 
the closes will be matched with opens in the same window, which is why 
I'm only tracking them separately

de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and 
the average time open was 120

de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X 
minute window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X 
minute window

etc

What I think I want to do is turn each event into a series of events 
with different keys, so that


{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events 
for a particular key, the memory/storage costs are zero for that key.


From what I can see, to achieve what I am trying to do, I could use a 
flatMap followed by a keyBy


In other words I take the events and flat map them based on the path 
split on '/' returning a Tuple of the (to be) key and the event. Then 
I can use keyBy to key based on the Tuple 0.


My ask:

Is the above design a good design? How would you achieve the end game 
better? Do I need to worry about many paths that are accessed rarely 
and would have an accumulator function that stays at 0 unless there 
are events in that window... or are the accumulators for each distinct 
key eagerly purged after each fire trigger.


What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc





Reduce one event under multiple keys

2019-02-08 Thread Stephen Connolly
Ok, I'll try and map my problem into something that should be familiar to
most people.

Consider collection of PCs, each of which has a unique ID, e.g.
ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are
coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long
they are open for.

I need for every X minute tumbling window not just the cumulative averages
for each PC, but the averages for each file as well as the cumulative
averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the
average time open was (67+97+197)/3=120... there is no guarantee that the
closes will be matched with opens in the same window, which is why I'm only
tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute
window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
window
etc

What I think I want to do is turn each event into a series of events with
different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a
particular key, the memory/storage costs are zero for that key.

>From what I can see, to achieve what I am trying to do, I could use a
flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split
on '/' returning a Tuple of the (to be) key and the event. Then I can use
keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game
better? Do I need to worry about many paths that are accessed rarely and
would have an accumulator function that stays at 0 unless there are events
in that window... or are the accumulators for each distinct key eagerly
purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc