On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Stephen,
>
> A window is created with the first record that is assigned to it.
> If the windows are based on time and a key, than no window will be created
> (and not space be occupied) if there is not a first record for a key and
> time interval.
>
> Anyway, if tracking the number of open files & average opening time is
> your use case, you might want to implement the logic with a ProcessFunction
> instead of a window.
> The reason is that it is that time windows don't share state, i.e., the
> information about an opened but not yet closed file would not be "carried
> over" to the next window.
> However, if you use a ProcessFunction, you are responsible for cleaning up
> the state.
>

Ahh but I am cheating by ensuring the events are rich enough that I do not
need to match them.

I get the "open" (they are not really "open" events - I have mapped to an
analogy... it might be more like a build job start events... or not... I'm
not at liberty to say ;-) ) events because I need to count the number of
"open"s per time period.

I get the "close" events and they include the duration plus other
information that can then be transformed into the required metrics... yes I
could derive the "open" from the "close" by subtracting the duration but:

1. they would cross window boundaries quite often, leading to repeated
fetch-update-write operations on the backing data store
2. they wouldn't be as "live" and one of the things we need to know is how
many "open"s there are in the previous window... given some durations can
be many days, waiting for the "close" event to create the "open" metric
would not be a good plan.

Basically, I am pushing some of the calculations to the edge where there is
state that makes those calculations cheap and then the rich events are
*hopefully* easy to aggregate with just simple aggregation functions that
only need to maintain the running total... at least that's what the PoC I
am experimenting with Flink should show


>
> Hope this helps,
> Fabian
>
> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>>
>>
>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> This sounds reasonable to me.
>>>
>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>> hoping that if a window has no events for a particular key, the
>>> memory/storage costs are zero for that key.*"
>>>
>>> Are you asking whether a key that was received in window X (as part of
>>> an event) is still present in window x+1? If so, then the answer is no; a
>>> key will only be present in a given window if an event was received that
>>> fits into that window.
>>>
>>
>> To confirm:
>>
>> So let's say I'l tracking the average time a file is opened in folders.
>>
>> In window N we get the events:
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>
>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}
>>
>> So there will be aggregates stored for
>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>
>> In window N+1 we do not get any events at all.
>>
>> So the memory used by my aggregation functions from window N will be
>> freed and the storage will be effectively zero (modulo any follow on
>> processing that might be on a longer window)
>>
>> This seems to be what you are saying... in which case my naïeve hope was
>> not so naïve! w00t!
>>
>>
>>>
>>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>>
>>> Ok, I'll try and map my problem into something that should be familiar
>>> to most people.
>>>
>>> Consider collection of PCs, each of which has a unique ID, e.g.
>>> ca:fe:ba:be, de:ad:be:ef, etc.
>>>
>>> Each PC has a tree of local files. Some of the file paths are
>>> coincidentally the same names, but there is no file sharing between PCs.
>>>
>>> I need to produce metrics about how often files are opened and how long
>>> they are open for.
>>>
>>> I need for every X minute tumbling window not just the cumulative
>>> averages for each PC, but the averages for each file as well as the
>>> cumulative averegaes for each folder and their sub-folders.
>>>
>>> I have a stream of events like
>>>
>>>
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
>>> guide.txt","duration":"196"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
>>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>>>
>>> So from that I would like to know stuff like:
>>>
>>> ca:fe:ba:be had 4/X opens per minute in the X minute window
>>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
>>> average time open was (67+97+197)/3=120... there is no guarantee that the
>>> closes will be matched with opens in the same window, which is why I'm only
>>> tracking them separately
>>> de:ad:be:ef had 2/X opens per minute in the X minute window
>>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
>>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and
>>> the average time open was 120
>>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
>>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
>>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
>>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
>>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
>>> minute window
>>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
>>> window
>>> etc
>>>
>>> What I think I want to do is turn each event into a series of events
>>> with different keys, so that
>>>
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>
>>> gets sent under the keys:
>>>
>>> ("ca:fe:ba:be","/")
>>> ("ca:fe:ba:be","/foo")
>>> ("ca:fe:ba:be","/foo/bar")
>>> ("ca:fe:ba:be","/foo/bar/README.txt")
>>>
>>> Then I could use a window aggregation function to just:
>>>
>>> * count the "open" events
>>> * count the "close" events and sum their duration
>>>
>>> Additionally, I am (naïevely) hoping that if a window has no events for
>>> a particular key, the memory/storage costs are zero for that key.
>>>
>>> From what I can see, to achieve what I am trying to do, I could use a
>>> flatMap followed by a keyBy
>>>
>>> In other words I take the events and flat map them based on the path
>>> split on '/' returning a Tuple of the (to be) key and the event. Then I can
>>> use keyBy to key based on the Tuple 0.
>>>
>>> My ask:
>>>
>>> Is the above design a good design? How would you achieve the end game
>>> better? Do I need to worry about many paths that are accessed rarely and
>>> would have an accumulator function that stays at 0 unless there are events
>>> in that window... or are the accumulators for each distinct key eagerly
>>> purged after each fire trigger.
>>>
>>> What gotcha's do I need to look for.
>>>
>>> Thanks in advance and appologies for the length
>>>
>>> -stephenc
>>>
>>>
>>>

Reply via email to