Hi Oleg,

With the approach with the MapState you can always fire on every
incoming element :)
You just iterate in the map state and find all the elements that have
timestamp (key) between the timestamp of the current element (NOW) and
and NOW-N.

Anyway, if Fanbin's solution works, then you can always use that!

Cheers,
Kostas

On Wed, Feb 12, 2020 at 7:18 PM Олег Бонарь <olegpel...@gmail.com> wrote:
>
> Hi Kostas,
>
> Thanks for your reply!
> Yes, you understand me correctly. However, I also want the stream to be keyed 
> to process it in parallel. I'm afraid the approach with MapState you 
> suggested doesn't really suite my use case because I need to fire on every 
> incoming event.
> Logically, Fanbin's "RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT 
> ROW" looks 100% like what I need, but I haven't tried it yet.
> Also wondering if it might be expressed in DataStream API.
>
> ср, 12 февр. 2020 г. в 13:06, Kostas Kloudas <kklou...@gmail.com>:
>>
>> Hi Oleg,
>>
>> Could you be more specific on what do you mean by
>> "for events of last n seconds(time units in general) for every incoming 
>> event."?
>>
>> Do you mean that you have a stream of parallelism 1 and you want for
>> each incoming element to have your function fire with input the event
>> itself and all the events that arrived within the last N time units?
>> If this is the case, you can use a dummy key to key your stream to
>> have access to keyed state, then use Map State with key being the
>> timestamp and value being a list of the already seen elements with
>> that timestamp and whenever an element arrives, you can register a
>> timer to fire N time units in the future. Then, when the timer fires,
>> you can iterate over the map, fetch the elements you are interested
>> in, and clean-up whatever you will not need anymore.
>>
>> For an example you could look at [1].
>>
>> I hope this helps,
>> Kostas
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu <fanbin...@coinbase.com> wrote:
>> >
>> > can u do
>> > RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
>> >
>> > On Tue, Feb 11, 2020 at 12:15 PM oleg <olegpel...@gmail.com> wrote:
>> >>
>> >> Hi Community,
>> >>
>> >> I do streaming in event time and I want to preserve ordering and late
>> >> events. I have a use case where I need to fire an aggregation function
>> >> for events of last n seconds(time units in general) for every incoming
>> >> event.
>> >>
>> >> It seems to me that windowing is not suitable since it may be expressed
>> >> either in time or in events count, not "last n seconds for each single
>> >> event".
>> >>
>> >> Is there an idiomatic way to do this? Any examples or help are
>> >> appreciated. Thanks in advance.
>> >>
>> >>
>> >> Best regards,
>> >>
>> >> Oleg Bonar
>> >>

Reply via email to