Hi,

I’m not sure, but I don’t think there is an existing window that would do 
exactly what you want. I would suggest to go back to the `keyedProcessFunction` 
(or a custom operator?), and have a MapState<TimeStamp, StateWithTimeStamp> 
currentStates field. Your key would be for example a timestamp of the beginning 
of your window. Value would be the latest state in this time window, annotated 
with a timestamp when this state was record.

On each element:

1. you determine the window’s begin ts (key of the map)
2. If it’s first element, register an event time timer to publish results for 
that window’s end TS
3. look into the `currentStates` if it should be modified (if your new element 
is newer or first value for the given key)

On even time timer firing
1. output the state matching to this timer
2. Check if there is a (more recent) value for next window, and if not:
 
3. copy the value to next window
4. Register a timer for this window to fire

5. Cleanup currentState and remove value for the no longed needed key.

I hope this helps

Piotrek 

> On 27 Apr 2020, at 12:01, Manas Kale <[email protected]> wrote:
> 
> Hi,
> I have an upstream operator that outputs device state transition messages 
> with event timestamps. Meaning it only emits output when a transition takes 
> place.
> For example, 
> state1 @ 1 PM
> state2 @ 2 PM
> and so on. 
> 
> Using a downstream operator, I want to emit notification messages as per some 
> configured periodicity. For example, if periodicity = 20 min, in the above 
> scenario this operator will output : 
> state1 notification @ 1PM
> state1 notification @ 1.20PM
> state1 notification @ 1.40PM
>  ...
> 
> Now the main issue is that I want this to be driven by the watermark and not 
> by transition events received from upstream. Meaning I would like to see 
> notification events as soon as the watermark crosses their timestamps; not 
> when the next transition event arrives at the operator (which could be hours 
> later, as above).
> 
> My first solution, using a keyedProcessFunction and timers did not work as 
> expected because the order in which transition events arrived at this 
> operator was non-deterministic. To elaborate, assume a 
> setAutoWatermarkInterval of 10 second.
> If we get transition events :
> state1 @ 1sec
> state2 @ 3 sec
> state3 @ 5 sec
> state1 @ 8 sec
> the order in which these events arrived at my keyedProcessFunction was not 
> fixed. To solve this, these messages need to be sorted on event time, which 
> led me to my second solution.
> 
> My second solution, using a EventTimeTumblingWindow with size = 
> setAutoWatermarkInterval, also does not work. I sorted accumulated events in 
> the window and applied notification-generation logic on them in order. 
> However, I assumed that windows are created even if there are no elements. 
> Since this is not the case, this solution generates notifications only when 
> the next state tranisition message arrives, which could be hours later.
> 
> Does anyone have any suggestions on how I can implement this?
> Thanks!
> 
> 
> 

Reply via email to