Hi Dongwon,

Do you need to number of active session windows as a DataStream or would
you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are
usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of
the record and ends at timestamp + gap size. When a record is added to a
window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge
window with overlapping ranges. When windows are merged, Trigger.onMerge()
is called.

In order to track how many session windows exist, we would need to
increment a counter by one when a new window is created (or an element is
assigned to a window, which is equivalent for session windows) and
decrement the counter when windows are merged by the number of merged
windows minus one.

Incrementing the counter is rather easy and can be done in
Trigger.onElement(), either by using state or a Counter metric (Triggers
have access to the metric system).
However, decrementing the counter is difficult. Although the
Trigger.onMerge() method is called, it does not know how many windows were
merged (which is done by the WindowAssigner) and only sees the merged
window. There might be a way to maintain state in a Trigger that allows to
infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>:

> Hi Fabian,
>
> I'm still eager to expose # of active sessions as a key metric of our
> service but I haven’t figured it out yet.
>
> First of all, I want to ask you some questions regarding your suggestion.
>
> You could implement a Trigger that fires when a new window is created and
> when the window is closed. A ProcessWindowFunction would emit a +1 if the
> window was created and a -1 when the window is closes.
> Session windows are a bit special, because you also need to handle the
> case of merging windows, i.e., two opened windows can be merged and only
> one (the merged) window is closed. So would need to emit a -2 if a merged
> window was closes (assuming only two windows were merged).
>
> Q1)
> How to fire when a new window is created and when the window is closed?
> AFAIK, we can return TriggerResult only through the three functions:
> onElement, onEventTime, and onProcessingTime.
> Q2)
> Firing is to emit elements in windows down to the window function, not
> emitting values like +1, -1 and -2 which are not in windows.
> Or do I miss something that you meant?
>
> In order to do that, you'd need to carry the merging information forward.
> The Trigger.onMerge method cannot trigger the window function, but it could
> store the merging information in state that is later accessed.
>
> Q3)
> I didn't understand what you mean at all. What do you mean by carrying the
> merging information?
>
> Besides your suggestion, I implemented a custom trigger which is almost
> the same as EventTimeTrigger except the followings:
> - it maintains a variable to count sessions in an instance of a window
> operator
> - it increases the variable by 1 when onElement is invoked
> - it decreases the variable by 1 when onClose is invoked
> Considering the logic of Flink’s session window, it correctly counts
> sessions in an instance of a window operator.
>
> As you might have already noticed, this approach has a critical problem: 
> there's
> no way to maintain an operator state inside a trigger.
> TriggerContext only allows to interact with state that is scoped to the
> window and the key of the current trigger invocation (as shown in
> Trigger#TriggerContext)
>
> Now I've come to a conclusion that it might not be possible using
> DataStream API.
> Otherwise, do I need to think in a totally different way to achieve the
> goal?
>
> Best,
>
> - Dongwon
>
>
>
> 2018. 2. 20. 오후 6:53, Fabian Hueske <fhue...@gmail.com> 작성:
>
> Hi Dongwon Kim,
>
> That's an interesting question.
>
> I don't have a solution blueprint for you, but a few ideas that should
> help to solve the problem.
>
> I would start with a separate job first and later try to integrate it with
> the other job.
> You could implement a Trigger that fires when a new window is created and
> when the window is closed. A ProcessWindowFunction would emit a +1 if the
> window was created and a -1 when the window is closes.
> Session windows are a bit special, because you also need to handle the
> case of merging windows, i.e., two opened windows can be merged and only
> one (the merged) window is closed. So would need to emit a -2 if a merged
> window was closes (assuming only two windows were merged).
> In order to do that, you'd need to carry the merging information forward.
> The Trigger.onMerge method cannot trigger the window function, but it could
> store the merging information in state that is later accessed.
>
> Hope this helps,
> Fabian
>
> 2018-02-20 9:54 GMT+01:00 Dongwon Kim <eastcirc...@gmail.com>:
>
>> Hi,
>>
>> It could be a totally stupid question but I currently have no idea how to
>> get the number of active session windows from a running job.
>>
>> Our traffic trajectory application (which handles up to 10,000 tps) uses
>> event-time session window on KeyedStream (keyed by userID).
>>
>> Should I write another Flink job for the purpose?
>>
>> Cheers,
>>
>> Dongwon Kim
>
>
>
>

Reply via email to