Hi Eugene,

I agree with Jan. Using a ProcessFunction is the way to go.

ProcessFunction gives you all the tools you need:
* ListState which is very cheap to append to (and you only need to read the
ListState when you receive a watermark).
* Access to event timestamps, the current watermark and timers.

The ProcessFunction logic is rather easy to implement and should me much
better to reason about than orting and firing logic that is spread across a
window assigner, trigger, and window function.
Moreover, you would not need to extend the WindowAssignerContext.

Best, Fabian


Am Mo., 17. Juni 2019 um 15:40 Uhr schrieb Евгений Юшин <
evgenij.us...@gmail.com>:

> Hi Jan
>
> Thanks for a quick reply.
>
> Doing stateful transformation requires re-writing the same logic which is
> already defined in Flink by itself. Let's consider example from my original
> message:
> There can be out-of-order data -> data should be propagated to next
> operator only when watermark crosses out-of-order boundaries -> all records
> with 'ts < watermark' should be pre-processed (e.g. sorted)
>
> Stateful function: all records should be stored in state, for each new
> record the whole state should be traversed to understand if out-of-order
> events can be propagated further. For unioned streams there should be logic
> to take min ts for each stream to compare, but info about which records
> goes to which stream is already lost. State should be persisted, and this
> adds some footprint during checkpoints.
> Flink windows handle all these duties under the hood.
>
> So I think Flink Windows (merging one for this particular case) interface
> is a perfect fit for such kind of activities when pre-processing should be
> done at first place.
>
>
>
> пн, 17 июн. 2019 г. в 11:35, Jan Lukavský <je...@seznam.cz>:
>
> > Hi Eugene,
> >
> > I'd say that what you want essentially is not "sort in windows", because
> > (as you mention), you want to emit elements from windows as soon as
> > watermark passes some timestamp. Maybe a better approach would be to
> > implement this using stateful processing, where you keep a buffer of
> > (unsorted) inputs and setup a timer for minimal time of elements in the
> > buffer (plus allowed lateness), and the sort elements with timestamp <=
> > the timer (very ofter single elements). I'm actually working on this for
> > Apache Beam (design doc [1]), but this is still a work-in-progress.
> >
> > Another drawback is that something like "sorted map state" will probably
> > be needed in order to efficiently query the state for minimal timestamp.
> > A less efficient implementation might work with ListState as well.
> >
> > Jan
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
> >
> > On 6/14/19 3:58 PM, Евгений Юшин wrote:
> > > Hi folks
> > >
> > > I want to sort stream based on event time field derived from events. To
> > do
> > > this I can use one of the existing windows like TimeWindow to collect
> > > events in a window of a particular size, or SlidingWindow to run sort
> > logic
> > > more often (and sort within slide).
> > > Ideally, I want to sort events as fast as they pass watermark (with
> > > out-of-order ts extractor). None of the current windows allow me to do
> > > this. And I think to implement custom merging window similar to
> > > SlidingWindow. Each element will be assigned to Window(event_ts,
> > > event_ts+1), and then all windows with 'start < watermark' will be
> > merged.
> > > To implement this I need time service available in
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
> > > Unfortunately, 'getCurrentProcessingTime'is only there for now.
> > >
> > > I can pass function to extract timestamp to my new window extractor,
> but
> > in
> > > this case logic for calculation min watermark for
> > > parallel/unioned/co-joined streams won't simply work.
> > >
> > > @devs would you mind if I extend WindowAssignerContext with
> > >   getCurrentWatermark or the whole time service reference?
> > >
> > > Would be really glad to hear ypur concerns.
> > >
> > > Regards,
> > > Eugene
> > >
> >
>

Reply via email to